Skip to content

Commit f596c69

Browse files
committed
pulse: connect recording stream only when recording is allowed
This makes streams list in pulseaudio/pipewire less cluttered (no streams that wouldn't really record), but also fixes recording indicator in xfce (which looks at connected streams, regardless of their corked state). Move stream creation to a separate function (connect_disconnect_rec_stream) and call it instead of just uncorking the stream. And also, still call it on pulseaudio connection, if recording was allowed initially. Since the stream connection is asynchronous, the "allow recording" logic is split into two parts, and both are put into connect_disconnect_rec_stream - selected by the state_callback parameter. This mechanism is a bit fragile, as there are quite a few states. Introduce rec_stream_connect_in_progress variable that is set when stream connect/disconnect is in progress. In that case, do not schedule another connect/disconnect, but at stream state change callback will check if the requested state didn't change in the meantime. This also means that if rec_stream_connect_in_progress is set, the rec_allowed value is unreliable (it may be a stream that is created but not connected yet for example) - so, if rec_stream_connect_in_progress is set, do not send any data. Since recording stream isn't initially connected now, decouple adding recording vchan callback from it - since that vchan is also used for commands. And adjust that callback to not assume the stream is always connected (do not use pa_stream_is_corked, don't assert for stream before checking if recording is allowed etc). And also drop pa_stream_cork in vchan error hander - the process quits few lines below anyway. Fixes QubesOS/qubes-issues#9999
1 parent b8dc96c commit f596c69

File tree

2 files changed

+114
-55
lines changed

2 files changed

+114
-55
lines changed

pulse/pacat-simple-vchan.c

Lines changed: 112 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ const pa_buffer_attr * bufattr = NULL;
9292

9393
static int verbose = 1;
9494

95+
static bool connect_disconnect_rec_stream(
96+
struct userdata *u, bool state_callback, bool new_rec_allowed);
97+
9598
static void context_state_callback(pa_context *c, void *userdata);
9699

97100
static void playback_stream_drain(struct userdata *u);
@@ -354,12 +357,13 @@ static void send_rec_data(pa_stream *s, struct userdata *u, bool discard_overrun
354357
size_t rec_buffer_length, rec_buffer_index = 0;
355358
int l, vchan_buffer_space;
356359

357-
assert(s);
358360
assert(u);
359361

360-
if (!u->rec_allowed)
362+
if (!u->rec_allowed || u->rec_stream_connect_in_progress)
361363
return;
362364

365+
assert(s);
366+
363367
if (pa_stream_readable_size(s) <= 0)
364368
return;
365369

@@ -448,8 +452,6 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a),
448452
uint32_t cmd;
449453
while (libvchan_data_ready(u->rec_ctrl) >= (int)sizeof(cmd)) {
450454
if (libvchan_read(u->rec_ctrl, (char*)&cmd, sizeof(cmd)) != sizeof(cmd)) {
451-
if (!pa_stream_is_corked(u->rec_stream))
452-
pa_stream_cork(u->rec_stream, 1, NULL, u);
453455
fprintf(stderr, "Failed to read from vchan\n");
454456
quit(u, 1);
455457
return;
@@ -461,10 +463,10 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a),
461463
if (!qdb_write(u->qdb, u->qdb_request_path, "1", 1)) {
462464
pacat_log("Failed to write QubesDB %s: %s", u->qdb_request_path, strerror(errno));
463465
}
464-
if (u->rec_allowed) {
466+
if (u->rec_allowed && !u->rec_stream_connect_in_progress) {
465467
pacat_log("Recording start");
466468
pa_stream_cork(u->rec_stream, 0, NULL, u);
467-
} else
469+
} else if (!u->rec_allowed)
468470
pacat_log("Recording requested but not allowed");
469471
g_mutex_unlock(&u->prop_mutex);
470472
break;
@@ -474,13 +476,17 @@ static void vchan_rec_callback(pa_mainloop_api *UNUSED(a),
474476
if (!qdb_write(u->qdb, u->qdb_request_path, "0", 1)) {
475477
pacat_log("Failed to write QubesDB %s: %s", u->qdb_request_path, strerror(errno));
476478
}
477-
if (!pa_stream_is_corked(u->rec_stream)) {
479+
if (u->rec_allowed && !u->rec_stream_connect_in_progress) {
478480
pacat_log("Recording stop");
479481
pa_stream_cork(u->rec_stream, 1, NULL, u);
480482
}
481483
g_mutex_unlock(&u->prop_mutex);
482484
break;
483485
case QUBES_PA_SINK_CORK_CMD:
486+
if (pa_stream_is_corked(u->play_stream)) {
487+
pacat_log("Stream already corked");
488+
break;
489+
}
484490
u->pending_play_cork = true;
485491
if (libvchan_data_ready(u->play_ctrl) > 0) {
486492
pacat_log("Deferred stream drain");
@@ -520,10 +526,11 @@ static void stream_state_callback(pa_stream *s, void *userdata) {
520526
assert(u->mainloop_api);
521527
u->mainloop_api->io_free(u->play_stdio_event);
522528
}
523-
if (u->rec_stdio_event && u->rec_stream == s) {
529+
if (u->rec_stream == s) {
524530
pacat_log("rec stream terminated");
525-
assert(u->mainloop_api);
526-
u->mainloop_api->io_free(u->rec_stdio_event);
531+
pa_stream_unref(u->rec_stream);
532+
u->rec_stream = NULL;
533+
connect_disconnect_rec_stream(u, true, false);
527534
}
528535
break;
529536

@@ -559,11 +566,11 @@ static void stream_state_callback(pa_stream *s, void *userdata) {
559566
}
560567
}
561568
if (u->rec_stream == s) {
562-
u->rec_stdio_event = u->mainloop_api->io_new(u->mainloop_api,
563-
libvchan_fd_for_select(u->rec_ctrl), PA_IO_EVENT_INPUT, vchan_rec_callback, u);
564-
if (!u->rec_stdio_event) {
565-
pacat_log("io_new rec failed");
566-
quit(u, 1);
569+
if (connect_disconnect_rec_stream(u, true, true)) {
570+
if (u->rec_requested) {
571+
pacat_log("Recording start");
572+
pa_stream_cork(u->rec_stream, 0, NULL, NULL);
573+
}
567574
}
568575
}
569576
break;
@@ -638,13 +645,84 @@ static void stream_event_callback(pa_stream *s, const char *name, pa_proplist *p
638645
pa_xfree(t);
639646
}
640647

648+
/*
649+
* Connect/disconnect rec stream based on new_rec_allowed. This can be called
650+
* from a state callback (if state_callback=true) to finalize
651+
* connecting/disconnecting.
652+
* Returns if stream is in the desired state.
653+
*/
654+
static bool connect_disconnect_rec_stream_locked(
655+
struct userdata *u, bool state_callback, bool new_rec_allowed)
656+
{
657+
if (state_callback) {
658+
if (new_rec_allowed == u->rec_allowed) {
659+
if (!qdb_write(u->qdb, u->qdb_status_path, u->rec_allowed ? "1" : "0", 1)) {
660+
pacat_log("Failed to write QubesDB %s: %s", u->qdb_status_path, strerror(errno));
661+
}
662+
u->rec_stream_connect_in_progress = false;
663+
return true;
664+
}
665+
}
666+
u->rec_stream_connect_in_progress = true;
667+
if (new_rec_allowed) {
668+
pa_stream_flags_t flags =
669+
PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY;
670+
671+
/* setup recording stream */
672+
assert(!u->rec_stream);
673+
674+
if (!(u->rec_stream = pa_stream_new_with_proplist(u->context, "record", &sample_spec, &u->channel_map, u->proplist))) {
675+
pacat_log("rec pa_stream_new() failed: %s", pa_strerror(pa_context_errno(u->context)));
676+
quit(u, 1);
677+
}
678+
679+
pa_stream_set_state_callback(u->rec_stream, stream_state_callback, u);
680+
/* pa_stream_set_write_callback */
681+
pa_stream_set_read_callback(u->rec_stream, stream_read_callback, u);
682+
pa_stream_set_suspended_callback(u->rec_stream, stream_suspended_callback, u);
683+
pa_stream_set_moved_callback(u->rec_stream, stream_moved_callback, u);
684+
pa_stream_set_underflow_callback(u->rec_stream, stream_underflow_callback, u);
685+
pa_stream_set_overflow_callback(u->rec_stream, stream_overflow_callback, u);
686+
pa_stream_set_started_callback(u->rec_stream, stream_started_callback, u);
687+
pa_stream_set_event_callback(u->rec_stream, stream_event_callback, u);
688+
pa_stream_set_buffer_attr_callback(u->rec_stream, stream_buffer_attr_callback, u);
689+
690+
if (pa_stream_connect_record(u->rec_stream,
691+
u->rec_device,
692+
bufattr,
693+
flags) < 0) {
694+
pacat_log("pa_stream_connect_record() failed: %s",
695+
pa_strerror(pa_context_errno(u->context)));
696+
u->rec_allowed = false;
697+
u->rec_stream_connect_in_progress = false;
698+
}
699+
} else {
700+
if (pa_stream_disconnect(u->rec_stream) < 0) {
701+
pacat_log("pa_stream_disconnect() failed: %s",
702+
pa_strerror(pa_context_errno(u->context)));
703+
u->rec_stream_connect_in_progress = false;
704+
}
705+
}
706+
return false;
707+
}
708+
709+
static bool connect_disconnect_rec_stream(
710+
struct userdata *u, bool state_callback, bool new_rec_allowed)
711+
{
712+
bool ret;
713+
714+
g_mutex_lock(&u->prop_mutex);
715+
ret = connect_disconnect_rec_stream_locked(u, state_callback, new_rec_allowed);
716+
g_mutex_unlock(&u->prop_mutex);
717+
return ret;
718+
}
719+
641720

642721

643722
/* This is called whenever the context status changes */
644723
static void context_state_callback(pa_context *c, void *userdata) {
645724
struct userdata *u = userdata;
646725
pa_stream_flags_t flags = 0;
647-
pa_channel_map channel_map;
648726

649727
assert(c);
650728

@@ -656,9 +734,9 @@ static void context_state_callback(pa_context *c, void *userdata) {
656734

657735
case PA_CONTEXT_READY:
658736

659-
pa_channel_map_init_extend(&channel_map, sample_spec.channels, PA_CHANNEL_MAP_DEFAULT);
737+
pa_channel_map_init_extend(&u->channel_map, sample_spec.channels, PA_CHANNEL_MAP_DEFAULT);
660738

661-
if (!pa_channel_map_compatible(&channel_map, &sample_spec)) {
739+
if (!pa_channel_map_compatible(&u->channel_map, &sample_spec)) {
662740
pacat_log("Channel map doesn't match sample specification");
663741
goto fail;
664742
}
@@ -669,7 +747,7 @@ static void context_state_callback(pa_context *c, void *userdata) {
669747
if (verbose)
670748
pacat_log("Connection established.%s", CLEAR_LINE);
671749

672-
if (!(u->play_stream = pa_stream_new_with_proplist(c, "playback", &sample_spec, &channel_map, u->proplist))) {
750+
if (!(u->play_stream = pa_stream_new_with_proplist(c, "playback", &sample_spec, &u->channel_map, u->proplist))) {
673751
pacat_log("play pa_stream_new() failed: %s", pa_strerror(pa_context_errno(c)));
674752
goto fail;
675753
}
@@ -693,37 +771,25 @@ static void context_state_callback(pa_context *c, void *userdata) {
693771
goto fail;
694772
}
695773

696-
/* setup recording stream */
697-
assert(!u->rec_stream);
698-
699-
if (!(u->rec_stream = pa_stream_new_with_proplist(c, "record", &sample_spec, &channel_map, u->proplist))) {
700-
pacat_log("rec pa_stream_new() failed: %s", pa_strerror(pa_context_errno(c)));
701-
goto fail;
702-
}
703-
704-
pa_stream_set_state_callback(u->rec_stream, stream_state_callback, u);
705-
/* pa_stream_set_write_callback */
706-
pa_stream_set_read_callback(u->rec_stream, stream_read_callback, u);
707-
pa_stream_set_suspended_callback(u->rec_stream, stream_suspended_callback, u);
708-
pa_stream_set_moved_callback(u->rec_stream, stream_moved_callback, u);
709-
pa_stream_set_underflow_callback(u->rec_stream, stream_underflow_callback, u);
710-
pa_stream_set_overflow_callback(u->rec_stream, stream_overflow_callback, u);
711-
pa_stream_set_started_callback(u->rec_stream, stream_started_callback, u);
712-
pa_stream_set_event_callback(u->rec_stream, stream_event_callback, u);
713-
pa_stream_set_buffer_attr_callback(u->rec_stream, stream_buffer_attr_callback, u);
714-
715-
flags = PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY;
716774
u->rec_requested = 0;
717775

718-
if (pa_stream_connect_record(u->rec_stream, u->rec_device, bufattr, flags) < 0) {
719-
pacat_log("pa_stream_connect_record() failed: %s", pa_strerror(pa_context_errno(c)));
720-
goto fail;
776+
/* and start watching for recording requests */
777+
u->rec_stdio_event = u->mainloop_api->io_new(u->mainloop_api,
778+
libvchan_fd_for_select(u->rec_ctrl), PA_IO_EVENT_INPUT, vchan_rec_callback, u);
779+
if (!u->rec_stdio_event) {
780+
pacat_log("io_new rec failed");
781+
quit(u, 1);
721782
}
722-
783+
if (u->rec_allowed)
784+
connect_disconnect_rec_stream(u, false, u->rec_allowed);
723785
break;
724786

725787
case PA_CONTEXT_TERMINATED:
726788
pacat_log("pulseaudio connection terminated");
789+
if (u->rec_stdio_event) {
790+
assert(u->mainloop_api);
791+
u->mainloop_api->io_free(u->rec_stdio_event);
792+
}
727793
quit(u, 0);
728794
break;
729795

@@ -865,18 +931,9 @@ static void control_socket_callback(pa_mainloop_api *UNUSED(a),
865931
g_mutex_lock(&u->prop_mutex);
866932
if (new_rec_allowed != u->rec_allowed) {
867933
u->rec_allowed = new_rec_allowed;
934+
if (!u->rec_stream_connect_in_progress)
935+
connect_disconnect_rec_stream_locked(u, false, new_rec_allowed);
868936
pacat_log("Setting audio-input to %s", u->rec_allowed ? "enabled" : "disabled");
869-
if (u->rec_allowed && u->rec_requested) {
870-
pacat_log("Recording start");
871-
pa_stream_cork(u->rec_stream, 0, NULL, NULL);
872-
} else if (!u->rec_allowed && u->rec_stream &&
873-
(u->rec_requested || !pa_stream_is_corked(u->rec_stream))) {
874-
pacat_log("Recording stop");
875-
pa_stream_cork(u->rec_stream, 1, NULL, NULL);
876-
}
877-
if (!qdb_write(u->qdb, u->qdb_status_path, new_rec_allowed ? "1" : "0", 1)) {
878-
pacat_log("Failed to write QubesDB %s: %s", u->qdb_status_path, strerror(errno));
879-
}
880937
}
881938
g_mutex_unlock(&u->prop_mutex);
882939
}
@@ -929,7 +986,7 @@ static int setup_control(struct userdata *u) {
929986
if (socket_fd < 0)
930987
goto fail;
931988

932-
rec_allowed = is_rec_allowed_from_qdb(u);
989+
rec_allowed = is_rec_allowed_from_qdb(u);
933990
if (rec_allowed >= 0) {
934991
pacat_log("mic allowed: initial value read from Qubes DB '%d'", rec_allowed);
935992
u->rec_allowed = rec_allowed;

pulse/pacat-simple-vchan.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ struct userdata {
3030
pa_io_event* play_ctrl_event;
3131
pa_io_event* rec_ctrl_event;
3232

33+
pa_channel_map channel_map;
3334
GMutex prop_mutex;
3435
qdb_handle_t qdb;
3536
qdb_handle_t watch_qdb; // separate connection for watches
@@ -38,6 +39,7 @@ struct userdata {
3839
char *qdb_request_path;
3940
int control_socket_fd;
4041
pa_io_event* control_socket_event;
42+
bool rec_stream_connect_in_progress;
4143
bool rec_allowed;
4244
bool rec_requested;
4345
bool never_block;

0 commit comments

Comments
 (0)