Skip to content

Commit a24cec6

Browse files
committed
more progress
1 parent f236e3d commit a24cec6

File tree

3 files changed

+126
-124
lines changed

3 files changed

+126
-124
lines changed

crates/audio/src/utils.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,6 @@ pub mod macos {
77
Some(device) => match device.streams() {
88
Ok(streams) => streams.iter().any(|s| {
99
if let Ok(term_type) = s.terminal_type() {
10-
println!(
11-
"device={} / term_type={:?}",
12-
device.name().unwrap(),
13-
term_type
14-
);
15-
1610
term_type.0 == io::audio::output_term::HEADPHONES
1711
|| term_type == ca::StreamTerminalType::HEADPHONES
1812
} else {

plugins/listener/src/actors/listen.rs

Lines changed: 122 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -47,123 +47,7 @@ impl Actor for ListenBridge {
4747
myself: ActorRef<Self::Msg>,
4848
args: Self::Arguments,
4949
) -> Result<Self::State, ActorProcessingErr> {
50-
let (tx, rx) =
51-
tokio::sync::mpsc::channel::<MixedMessage<(Bytes, Bytes), ControlMessage>>(32);
52-
53-
let conn = {
54-
use tauri_plugin_local_stt::LocalSttPluginExt;
55-
56-
match args.app.get_connection().await {
57-
Ok(c) => c,
58-
Err(e) => {
59-
tracing::error!("failed_to_get_connection: {:?}", e);
60-
return Err(ActorProcessingErr::from(e));
61-
}
62-
}
63-
};
64-
65-
let client = owhisper_client::ListenClient::builder()
66-
.api_base(conn.base_url)
67-
.api_key(conn.api_key.unwrap_or_default())
68-
.params(owhisper_interface::ListenParams {
69-
model: conn.model,
70-
languages: args.languages,
71-
redemption_time_ms: Some(if args.onboarding { 60 } else { 400 }),
72-
..Default::default()
73-
})
74-
.build_dual();
75-
76-
let rx_task = tokio::spawn({
77-
let app = args.app.clone();
78-
let session_id = args.session_id.clone();
79-
80-
async move {
81-
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
82-
let (listen_stream, _handle) = match client.from_realtime_audio(outbound).await {
83-
Ok(res) => res,
84-
Err(e) => {
85-
tracing::error!("listen_ws_connect_failed: {:?}", e);
86-
myself.stop(Some(format!("listen_ws_connect_failed: {:?}", e)));
87-
return;
88-
}
89-
};
90-
futures_util::pin_mut!(listen_stream);
91-
92-
let mut manager = TranscriptManager::with_unix_timestamp(args.session_start_ts_ms);
93-
94-
loop {
95-
match tokio::time::timeout(LISTEN_STREAM_TIMEOUT, listen_stream.next()).await {
96-
Ok(Some(response)) => {
97-
let diff = manager.append(response.clone());
98-
99-
let partial_words_by_channel: HashMap<usize, Vec<Word2>> = diff
100-
.partial_words
101-
.iter()
102-
.map(|(channel_idx, words)| {
103-
(
104-
*channel_idx,
105-
words
106-
.iter()
107-
.map(|w| Word2::from(w.clone()))
108-
.collect::<Vec<_>>(),
109-
)
110-
})
111-
.collect();
112-
113-
SessionEvent::PartialWords {
114-
words: partial_words_by_channel,
115-
}
116-
.emit(&app)
117-
.unwrap();
118-
119-
let final_words_by_channel: HashMap<usize, Vec<Word2>> = diff
120-
.final_words
121-
.iter()
122-
.map(|(channel_idx, words)| {
123-
(
124-
*channel_idx,
125-
words
126-
.iter()
127-
.map(|w| Word2::from(w.clone()))
128-
.collect::<Vec<_>>(),
129-
)
130-
})
131-
.collect();
132-
133-
update_session(
134-
&app,
135-
&session_id,
136-
final_words_by_channel
137-
.clone()
138-
.values()
139-
.flatten()
140-
.cloned()
141-
.collect(),
142-
)
143-
.await
144-
.unwrap();
145-
146-
SessionEvent::FinalWords {
147-
words: final_words_by_channel,
148-
}
149-
.emit(&app)
150-
.unwrap();
151-
}
152-
Ok(None) => {
153-
tracing::info!("listen_stream_ended");
154-
break;
155-
}
156-
Err(_) => {
157-
tracing::info!("listen_stream_timeout");
158-
break;
159-
}
160-
}
161-
}
162-
163-
myself.stop(None);
164-
}
165-
});
166-
50+
let (tx, rx_task) = spawn_rx_task(args, myself).await.unwrap();
16751
Ok(ListenState { tx, rx_task })
16852
}
16953

@@ -191,6 +75,127 @@ impl Actor for ListenBridge {
19175
}
19276
}
19377

78+
async fn spawn_rx_task(
79+
args: ListenArgs,
80+
myself: ActorRef<ListenMsg>,
81+
) -> Result<
82+
(
83+
tokio::sync::mpsc::Sender<MixedMessage<(Bytes, Bytes), ControlMessage>>,
84+
tokio::task::JoinHandle<()>,
85+
),
86+
ActorProcessingErr,
87+
> {
88+
let (tx, rx) = tokio::sync::mpsc::channel::<MixedMessage<(Bytes, Bytes), ControlMessage>>(32);
89+
90+
let app = args.app.clone();
91+
let session_id = args.session_id.clone();
92+
let session_start_ts_ms = args.session_start_ts_ms;
93+
94+
let conn = {
95+
use tauri_plugin_local_stt::LocalSttPluginExt;
96+
app.get_connection().await?
97+
};
98+
99+
let client = owhisper_client::ListenClient::builder()
100+
.api_base(conn.base_url)
101+
.api_key(conn.api_key.unwrap_or_default())
102+
.params(owhisper_interface::ListenParams {
103+
model: conn.model,
104+
languages: args.languages,
105+
redemption_time_ms: Some(if args.onboarding { 60 } else { 400 }),
106+
..Default::default()
107+
})
108+
.build_dual();
109+
110+
let rx_task = tokio::spawn(async move {
111+
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
112+
let (listen_stream, _handle) = match client.from_realtime_audio(outbound).await {
113+
Ok(res) => res,
114+
Err(e) => {
115+
tracing::error!("listen_ws_connect_failed: {:?}", e);
116+
myself.stop(Some(format!("listen_ws_connect_failed: {:?}", e)));
117+
return;
118+
}
119+
};
120+
futures_util::pin_mut!(listen_stream);
121+
122+
let mut manager = TranscriptManager::with_unix_timestamp(session_start_ts_ms);
123+
124+
loop {
125+
match tokio::time::timeout(LISTEN_STREAM_TIMEOUT, listen_stream.next()).await {
126+
Ok(Some(response)) => {
127+
let diff = manager.append(response.clone());
128+
129+
let partial_words_by_channel: HashMap<usize, Vec<Word2>> = diff
130+
.partial_words
131+
.iter()
132+
.map(|(channel_idx, words)| {
133+
(
134+
*channel_idx,
135+
words
136+
.iter()
137+
.map(|w| Word2::from(w.clone()))
138+
.collect::<Vec<_>>(),
139+
)
140+
})
141+
.collect();
142+
143+
SessionEvent::PartialWords {
144+
words: partial_words_by_channel,
145+
}
146+
.emit(&app)
147+
.unwrap();
148+
149+
let final_words_by_channel: HashMap<usize, Vec<Word2>> = diff
150+
.final_words
151+
.iter()
152+
.map(|(channel_idx, words)| {
153+
(
154+
*channel_idx,
155+
words
156+
.iter()
157+
.map(|w| Word2::from(w.clone()))
158+
.collect::<Vec<_>>(),
159+
)
160+
})
161+
.collect();
162+
163+
update_session(
164+
&app,
165+
&session_id,
166+
final_words_by_channel
167+
.clone()
168+
.values()
169+
.flatten()
170+
.cloned()
171+
.collect(),
172+
)
173+
.await
174+
.unwrap();
175+
176+
SessionEvent::FinalWords {
177+
words: final_words_by_channel,
178+
}
179+
.emit(&app)
180+
.unwrap();
181+
}
182+
Ok(None) => {
183+
tracing::info!("listen_stream_ended");
184+
break;
185+
}
186+
Err(_) => {
187+
tracing::info!("listen_stream_timeout");
188+
break;
189+
}
190+
}
191+
}
192+
193+
myself.stop(None);
194+
});
195+
196+
Ok((tx, rx_task))
197+
}
198+
194199
async fn update_session<R: tauri::Runtime>(
195200
app: &tauri::AppHandle<R>,
196201
session_id: impl Into<String>,

plugins/listener/src/actors/processor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,11 @@ impl Actor for AudioProcessor {
101101
ProcMsg::Mixed(mut c) => {
102102
st.agc_m.process(&mut c.data);
103103
let arc = Arc::<[f32]>::from(c.data);
104+
104105
st.last_mic = Some(arc.clone());
105-
st.joiner.push_mic(arc);
106+
st.last_spk = Some(arc.clone());
107+
st.joiner.push_mic(arc.clone());
108+
st.joiner.push_spk(arc);
106109
process_ready(st).await;
107110
}
108111
}

0 commit comments

Comments
 (0)