|
1 | | -use std::cell::Cell; |
2 | | -use std::collections::HashMap; |
3 | | -use std::rc::Rc; |
4 | | -use std::sync::{Arc, Mutex, Weak}; |
5 | | -use std::thread; |
6 | | - |
7 | | -use ::pipewire::{ |
8 | | - context::Context, keys, main_loop::MainLoop, properties::properties, spa::utils::dict::DictRef, |
9 | | - types::ObjectType, |
10 | | -}; |
11 | 1 | use itertools::Itertools as _; |
12 | | -use tokio::sync::Notify; |
| 2 | +use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel}; |
13 | 3 |
|
14 | 4 | use super::*; |
15 | | - |
16 | | -static CLIENT: LazyLock<Result<Client>> = LazyLock::new(Client::new); |
17 | | - |
18 | | -#[derive(Debug)] |
19 | | -struct Node { |
20 | | - name: String, |
21 | | - nick: Option<String>, |
22 | | - media_class: Option<String>, |
23 | | - media_role: Option<String>, |
24 | | - description: Option<String>, |
25 | | -} |
26 | | - |
27 | | -impl Node { |
28 | | - fn new(global_id: u32, global_props: &DictRef) -> Self { |
29 | | - Self { |
30 | | - name: global_props |
31 | | - .get(&keys::NODE_NAME) |
32 | | - .map_or_else(|| format!("node_{}", global_id), |s| s.to_string()), |
33 | | - nick: global_props.get(&keys::NODE_NICK).map(|s| s.to_string()), |
34 | | - media_class: global_props.get(&keys::MEDIA_CLASS).map(|s| s.to_string()), |
35 | | - media_role: global_props.get(&keys::MEDIA_ROLE).map(|s| s.to_string()), |
36 | | - description: global_props |
37 | | - .get(&keys::NODE_DESCRIPTION) |
38 | | - .map(|s| s.to_string()), |
39 | | - } |
40 | | - } |
41 | | -} |
42 | | - |
43 | | -#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)] |
44 | | -struct Link { |
45 | | - link_output_node: u32, |
46 | | - link_input_node: u32, |
47 | | -} |
48 | | - |
49 | | -impl Link { |
50 | | - fn new(global_props: &DictRef) -> Option<Self> { |
51 | | - if let (Some(link_output_node), Some(link_input_node)) = ( |
52 | | - global_props |
53 | | - .get(&keys::LINK_OUTPUT_NODE) |
54 | | - .and_then(|s| s.parse().ok()), |
55 | | - global_props |
56 | | - .get(&keys::LINK_INPUT_NODE) |
57 | | - .and_then(|s| s.parse().ok()), |
58 | | - ) { |
59 | | - Some(Self { |
60 | | - link_output_node, |
61 | | - link_input_node, |
62 | | - }) |
63 | | - } else { |
64 | | - None |
65 | | - } |
66 | | - } |
67 | | -} |
68 | | - |
69 | | -#[derive(Default)] |
70 | | -struct Data { |
71 | | - nodes: HashMap<u32, Node>, |
72 | | - links: HashMap<u32, Link>, |
73 | | -} |
74 | | - |
75 | | -#[derive(Default)] |
76 | | -struct Client { |
77 | | - event_listeners: Mutex<Vec<Weak<Notify>>>, |
78 | | - data: Mutex<Data>, |
79 | | -} |
80 | | - |
81 | | -impl Client { |
82 | | - fn new() -> Result<Client> { |
83 | | - thread::Builder::new() |
84 | | - .name("privacy_pipewire".to_string()) |
85 | | - .spawn(Client::main_loop_thread) |
86 | | - .error("failed to spawn a thread")?; |
87 | | - |
88 | | - Ok(Client::default()) |
89 | | - } |
90 | | - |
91 | | - fn main_loop_thread() { |
92 | | - let client = CLIENT.as_ref().error("Could not get client").unwrap(); |
93 | | - |
94 | | - let proplist = properties! {*keys::APP_NAME => env!("CARGO_PKG_NAME")}; |
95 | | - |
96 | | - let main_loop = MainLoop::new(None).expect("Failed to create main loop"); |
97 | | - |
98 | | - let context = |
99 | | - Context::with_properties(&main_loop, proplist).expect("Failed to create context"); |
100 | | - let core = context.connect(None).expect("Failed to connect"); |
101 | | - let registry = core.get_registry().expect("Failed to get registry"); |
102 | | - |
103 | | - let updated = Rc::new(Cell::new(false)); |
104 | | - let updated_copy = updated.clone(); |
105 | | - let updated_copy2 = updated.clone(); |
106 | | - |
107 | | - // Register a callback to the `global` event on the registry, which notifies of any new global objects |
108 | | - // appearing on the remote. |
109 | | - // The callback will only get called as long as we keep the returned listener alive. |
110 | | - let _registry_listener = registry |
111 | | - .add_listener_local() |
112 | | - .global(move |global| { |
113 | | - let Some(global_props) = global.props else { |
114 | | - return; |
115 | | - }; |
116 | | - match &global.type_ { |
117 | | - ObjectType::Node => { |
118 | | - client |
119 | | - .data |
120 | | - .lock() |
121 | | - .unwrap() |
122 | | - .nodes |
123 | | - .insert(global.id, Node::new(global.id, global_props)); |
124 | | - updated_copy.set(true); |
125 | | - } |
126 | | - ObjectType::Link => { |
127 | | - let Some(link) = Link::new(global_props) else { |
128 | | - return; |
129 | | - }; |
130 | | - client.data.lock().unwrap().links.insert(global.id, link); |
131 | | - updated_copy.set(true); |
132 | | - } |
133 | | - _ => (), |
134 | | - } |
135 | | - }) |
136 | | - .global_remove(move |uid| { |
137 | | - let mut data = client.data.lock().unwrap(); |
138 | | - if data.nodes.remove(&uid).is_some() || data.links.remove(&uid).is_some() { |
139 | | - updated_copy2.set(true); |
140 | | - } |
141 | | - }) |
142 | | - .register(); |
143 | | - |
144 | | - loop { |
145 | | - main_loop.loop_().iterate(Duration::from_secs(60 * 60 * 24)); |
146 | | - if updated.get() { |
147 | | - updated.set(false); |
148 | | - client |
149 | | - .event_listeners |
150 | | - .lock() |
151 | | - .unwrap() |
152 | | - .retain(|notify| notify.upgrade().inspect(|x| x.notify_one()).is_some()); |
153 | | - } |
154 | | - } |
155 | | - } |
156 | | - |
157 | | - fn add_event_listener(&self, notify: &Arc<Notify>) { |
158 | | - self.event_listeners |
159 | | - .lock() |
160 | | - .unwrap() |
161 | | - .push(Arc::downgrade(notify)); |
162 | | - } |
163 | | -} |
| 5 | +use crate::pipewire::{CLIENT, EventKind, Link, Node}; |
164 | 6 |
|
165 | 7 | #[derive(Deserialize, Debug, SmartDefault)] |
166 | 8 | #[serde(rename_all = "lowercase", deny_unknown_fields, default)] |
@@ -191,15 +33,18 @@ impl NodeDisplay { |
191 | 33 |
|
192 | 34 | pub(super) struct Monitor<'a> { |
193 | 35 | config: &'a Config, |
194 | | - notify: Arc<Notify>, |
| 36 | + updates: UnboundedReceiver<EventKind>, |
195 | 37 | } |
196 | 38 |
|
197 | 39 | impl<'a> Monitor<'a> { |
198 | 40 | pub(super) async fn new(config: &'a Config) -> Result<Self> { |
199 | 41 | let client = CLIENT.as_ref().error("Could not get client")?; |
200 | | - let notify = Arc::new(Notify::new()); |
201 | | - client.add_event_listener(¬ify); |
202 | | - Ok(Self { config, notify }) |
| 42 | + let (tx, rx) = unbounded_channel(); |
| 43 | + client.add_event_listener(tx); |
| 44 | + Ok(Self { |
| 45 | + config, |
| 46 | + updates: rx, |
| 47 | + }) |
203 | 48 | } |
204 | 49 | } |
205 | 50 |
|
@@ -260,7 +105,16 @@ impl PrivacyMonitor for Monitor<'_> { |
260 | 105 | } |
261 | 106 |
|
262 | 107 | async fn wait_for_change(&mut self) -> Result<()> { |
263 | | - self.notify.notified().await; |
| 108 | + while let Some(event) = self.updates.recv().await { |
| 109 | + if event.intersects( |
| 110 | + EventKind::NODE_ADDED |
| 111 | + | EventKind::NODE_REMOVED |
| 112 | + | EventKind::LINK_ADDED |
| 113 | + | EventKind::LINK_REMOVED, |
| 114 | + ) { |
| 115 | + break; |
| 116 | + } |
| 117 | + } |
264 | 118 | Ok(()) |
265 | 119 | } |
266 | 120 | } |
0 commit comments