Skip to content

Commit 82dcc3c

Browse files
committed
add: Stream-based File Uploads w/ real-time UI feedback
An enhanced uploading experience: stable and real-time.
1 parent 36aa222 commit 82dcc3c

File tree

3 files changed

+281
-23
lines changed

3 files changed

+281
-23
lines changed

src-tauri/src/lib.rs

+46-21
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::borrow::Cow;
2+
use std::sync::Arc;
23
use argon2::{Argon2, Params, Version};
34
use lazy_static::lazy_static;
45
use nostr_sdk::prelude::*;
@@ -26,6 +27,9 @@ use voice::AudioRecorder;
2627

2728
mod net;
2829

30+
mod upload;
31+
use upload::{upload_data_with_progress, ProgressCallback};
32+
2933
mod util;
3034
use util::{extract_https_urls, get_file_type_description};
3135

@@ -772,9 +776,10 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
772776
.iter()
773777
.filter(|m| m.pending)
774778
.count();
775-
let pending_id = String::from("pending-") + &pending_count.to_string();
779+
// Create persistent pending_id that will live for the entire function
780+
let pending_id = Arc::new(String::from("pending-") + &pending_count.to_string());
776781
let msg = Message {
777-
id: pending_id.clone(),
782+
id: pending_id.as_ref().clone(),
778783
content,
779784
replied_to,
780785
preview_metadata: None,
@@ -790,13 +795,6 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
790795
};
791796
STATE.lock().await.add_message(&receiver, msg.clone());
792797

793-
// Send the pending message to our frontend
794-
let handle = TAURI_APP.get().unwrap();
795-
handle.emit("message_new", serde_json::json!({
796-
"message": &msg,
797-
"chat_id": &receiver
798-
})).unwrap();
799-
800798
// Grab our pubkey
801799
let client = NOSTR_CLIENT.get().expect("Nostr client not initialized");
802800
let signer = client.signer().await.unwrap();
@@ -806,7 +804,14 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
806804
let receiver_pubkey = PublicKey::from_bech32(receiver.clone().as_str()).unwrap();
807805

808806
// Prepare the NIP-17 rumor
807+
let handle = TAURI_APP.get().unwrap();
809808
let mut rumor = if file.is_none() {
809+
// Send the text message to our frontend
810+
handle.emit("message_new", serde_json::json!({
811+
"message": &msg,
812+
"chat_id": &receiver
813+
})).unwrap();
814+
810815
// Text Message
811816
EventBuilder::private_msg_rumor(receiver_pubkey, msg.content)
812817
} else {
@@ -818,10 +823,13 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
818823

819824
// Update the attachment in-state
820825
{
826+
// Use a clone of the Arc for this block
827+
let pending_id_clone = Arc::clone(&pending_id);
828+
821829
// Retrieve the Pending Message
822830
let mut state = STATE.lock().await;
823831
let chat = state.get_profile_mut(&receiver).unwrap();
824-
let message = chat.get_message_mut(&pending_id).unwrap();
832+
let message = chat.get_message_mut(pending_id_clone.as_ref()).unwrap();
825833

826834
// Choose the appropriate base directory based on platform
827835
let base_directory = if cfg!(target_os = "ios") {
@@ -856,9 +864,8 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
856864
downloaded: true
857865
});
858866

859-
// Update the frontend
860-
handle.emit("message_update", serde_json::json!({
861-
"old_id": &pending_id,
867+
// Send the pending file upload to our frontend
868+
handle.emit("message_new", serde_json::json!({
862869
"message": &message,
863870
"chat_id": &receiver
864871
})).unwrap();
@@ -889,7 +896,21 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
889896
let signer = client.signer().await.unwrap();
890897
let conf = PRIVATE_NIP96_CONFIG.wait();
891898
let file_size = enc_file.len();
892-
match upload_data(&signer, &conf, enc_file, Some(mime_type), None).await {
899+
// Clone the Arc outside the closure for use inside a seperate-threaded progress callback
900+
let pending_id_for_callback = Arc::clone(&pending_id);
901+
// Create a progress callback for file uploads
902+
let progress_callback: ProgressCallback = Box::new(move |percentage, _| {
903+
// This is a simple callback that logs progress but could be enhanced to emit events
904+
if let Some(pct) = percentage {
905+
handle.emit("attachment_upload_progress", serde_json::json!({
906+
"id": pending_id_for_callback.as_ref(),
907+
"progress": pct
908+
})).unwrap();
909+
}
910+
Ok(())
911+
});
912+
913+
match upload_data_with_progress(&signer, &conf, enc_file, Some(mime_type), None, progress_callback).await {
893914
Ok(url) => {
894915
// Create the attachment rumor
895916
let attachment_rumor = EventBuilder::new(Kind::from_u16(15), url.to_string());
@@ -905,14 +926,15 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
905926
},
906927
Err(_) => {
907928
// The file upload failed: so we mark the message as failed and notify of an error
929+
let pending_id_for_failure = Arc::clone(&pending_id);
908930
let mut state = STATE.lock().await;
909931
let chat = state.get_profile_mut(&receiver).unwrap();
910-
let failed_msg = chat.get_message_mut(&pending_id).unwrap();
932+
let failed_msg = chat.get_message_mut(pending_id_for_failure.as_ref()).unwrap();
911933
failed_msg.failed = true;
912934

913935
// Update the frontend
914936
handle.emit("message_update", serde_json::json!({
915-
"old_id": &pending_id,
937+
"old_id": pending_id_for_failure.as_ref(),
916938
"message": &failed_msg,
917939
"chat_id": &receiver
918940
})).unwrap();
@@ -948,15 +970,16 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
948970
{
949971
Ok(_) => {
950972
// Mark the message as a success
973+
let pending_id_for_success = Arc::clone(&pending_id);
951974
let mut state = STATE.lock().await;
952975
let chat = state.get_profile_mut(&receiver).unwrap();
953-
let sent_msg = chat.get_message_mut(&pending_id).unwrap();
976+
let sent_msg = chat.get_message_mut(pending_id_for_success.as_ref()).unwrap();
954977
sent_msg.id = rumor_id.to_hex();
955978
sent_msg.pending = false;
956979

957980
// Update the frontend
958981
handle.emit("message_update", serde_json::json!({
959-
"old_id": &pending_id,
982+
"old_id": pending_id_for_success.as_ref(),
960983
"message": &sent_msg,
961984
"chat_id": &receiver
962985
})).unwrap();
@@ -969,15 +992,16 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
969992
Err(_) => {
970993
// This is an odd case; the message was sent to the receiver, but NOT ourselves
971994
// We'll class it as sent, for now...
995+
let pending_id_for_partial = Arc::clone(&pending_id);
972996
let mut state = STATE.lock().await;
973997
let chat = state.get_profile_mut(&receiver).unwrap();
974-
let sent_ish_msg = chat.get_message_mut(&pending_id).unwrap();
998+
let sent_ish_msg = chat.get_message_mut(pending_id_for_partial.as_ref()).unwrap();
975999
sent_ish_msg.id = rumor_id.to_hex();
9761000
sent_ish_msg.pending = false;
9771001

9781002
// Update the frontend
9791003
handle.emit("message_update", serde_json::json!({
980-
"old_id": &pending_id,
1004+
"old_id": pending_id_for_partial.as_ref(),
9811005
"message": &sent_ish_msg,
9821006
"chat_id": &receiver
9831007
})).unwrap();
@@ -991,9 +1015,10 @@ async fn message(receiver: String, content: String, replied_to: String, file: Op
9911015
}
9921016
Err(_) => {
9931017
// Mark the message as a failure, bad message, bad!
1018+
let pending_id_for_final = Arc::clone(&pending_id);
9941019
let mut state = STATE.lock().await;
9951020
let chat = state.get_profile_mut(&receiver).unwrap();
996-
let failed_msg = chat.get_message_mut(&pending_id).unwrap();
1021+
let failed_msg = chat.get_message_mut(pending_id_for_final.as_ref()).unwrap();
9971022
failed_msg.failed = true;
9981023
return Ok(false);
9991024
}

src-tauri/src/upload.rs

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
use nostr_sdk::{
2+
nips::nip96::{Error, ServerConfig, UploadResponse, UploadResponseStatus},
3+
nips::nip98::{HttpData, HttpMethod},
4+
NostrSigner, TagKind, TagStandard, Url,
5+
};
6+
use nostr_sdk::hashes::{sha256::Hash as Sha256Hash, Hash};
7+
use reqwest::{
8+
multipart::{self, Part},
9+
Body, Client
10+
};
11+
use std::net::SocketAddr;
12+
use tokio::sync::mpsc;
13+
use std::sync::{Arc, Mutex};
14+
15+
/// Makes a reqwest client
16+
fn make_client(proxy: Option<SocketAddr>) -> Result<Client, Error> {
17+
let client: Client = {
18+
let mut builder = Client::builder();
19+
if let Some(proxy) = proxy {
20+
let proxy = format!("socks5h://{proxy}");
21+
use reqwest::Proxy;
22+
builder = builder.proxy(Proxy::all(proxy)?);
23+
}
24+
builder.build()?
25+
};
26+
27+
Ok(client)
28+
}
29+
30+
/// Custom upload stream that allows tracking progress
31+
struct ProgressTrackingStream {
32+
total_size: u64,
33+
bytes_sent: Arc<Mutex<u64>>,
34+
inner: mpsc::Receiver<Result<Vec<u8>, std::io::Error>>,
35+
}
36+
37+
impl ProgressTrackingStream {
38+
fn new(data: Vec<u8>, bytes_sent: Arc<Mutex<u64>>) -> Self {
39+
let total_size = data.len() as u64;
40+
let (tx, rx) = mpsc::channel(8); // Buffer size of 8 chunks
41+
42+
// Spawn a background task to feed the stream
43+
tokio::spawn(async move {
44+
let chunk_size = 64 * 1024; // 64 KB chunks
45+
let mut position = 0;
46+
47+
while position < data.len() {
48+
let end = std::cmp::min(position + chunk_size, data.len());
49+
let chunk = data[position..end].to_vec();
50+
let chunk_size = chunk.len();
51+
52+
// Send chunk through channel
53+
if tx.send(Ok(chunk)).await.is_err() {
54+
break; // Receiver was dropped
55+
}
56+
57+
position += chunk_size;
58+
}
59+
});
60+
61+
Self {
62+
total_size,
63+
bytes_sent,
64+
inner: rx,
65+
}
66+
}
67+
}
68+
69+
impl futures_util::Stream for ProgressTrackingStream {
70+
type Item = Result<Vec<u8>, std::io::Error>;
71+
72+
fn poll_next(
73+
mut self: std::pin::Pin<&mut Self>,
74+
cx: &mut std::task::Context<'_>,
75+
) -> std::task::Poll<Option<Self::Item>> {
76+
use std::task::Poll;
77+
78+
match self.inner.poll_recv(cx) {
79+
Poll::Ready(Some(result)) => {
80+
// Update the bytes sent counter
81+
if let Ok(chunk) = &result {
82+
let mut bytes_sent = self.bytes_sent.lock().unwrap();
83+
*bytes_sent += chunk.len() as u64;
84+
}
85+
Poll::Ready(Some(result))
86+
}
87+
Poll::Ready(None) => Poll::Ready(None),
88+
Poll::Pending => Poll::Pending,
89+
}
90+
}
91+
}
92+
93+
/// Progress callback function type
94+
pub type ProgressCallback = Box<dyn Fn(Option<u8>, Option<u64>) -> Result<(), String> + Send + Sync>;
95+
96+
/// Uploads data to a NIP-96 server with progress callback
97+
///
98+
/// This function extends the standard NIP-96 upload_data function by adding progress reporting
99+
/// via a callback function that is called periodically during the upload process.
100+
pub async fn upload_data_with_progress<T>(
101+
signer: &T,
102+
desc: &ServerConfig,
103+
file_data: Vec<u8>,
104+
mime_type: Option<&str>,
105+
proxy: Option<SocketAddr>,
106+
progress_callback: ProgressCallback,
107+
) -> Result<Url, Error>
108+
where
109+
T: NostrSigner,
110+
{
111+
// Build NIP98 Authorization header
112+
let payload: Sha256Hash = Sha256Hash::hash(&file_data);
113+
let data = HttpData::new(desc.api_url.clone(), HttpMethod::POST).payload(payload);
114+
let nip98_auth: String = data.to_authorization(signer).await?;
115+
116+
// Create shared counter for tracking upload progress
117+
let bytes_sent = Arc::new(Mutex::new(0u64));
118+
let total_size = file_data.len() as u64;
119+
120+
// Report initial progress (0%)
121+
progress_callback(Some(0), Some(0)).map_err(|e| Error::UploadError(e))?;
122+
123+
// Make client
124+
let client: Client = make_client(proxy)?;
125+
126+
// Create form with tracking stream
127+
let file_part = {
128+
let tracking_stream = ProgressTrackingStream::new(file_data.clone(), bytes_sent.clone());
129+
let body = Body::wrap_stream(tracking_stream);
130+
131+
let mut part = Part::stream(body)
132+
.file_name("filename");
133+
134+
// Set MIME type if provided
135+
if let Some(mime_str) = mime_type {
136+
part = part.mime_str(mime_str).map_err(|_| Error::MultipartMimeError)?;
137+
}
138+
139+
part
140+
};
141+
142+
let form = multipart::Form::new().part("file", file_part);
143+
144+
// Launch upload as a future, but don't await it yet
145+
let mut response_future = client
146+
.post(desc.api_url.clone())
147+
.header("Authorization", nip98_auth)
148+
.multipart(form)
149+
.send();
150+
151+
// Create a future that polls the bytes_sent counter periodically
152+
let mut last_percentage = 0;
153+
let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
154+
155+
// Use tokio::select to concurrently wait for the response and report progress
156+
let response = loop {
157+
tokio::select! {
158+
// Check if the response is ready
159+
response = &mut response_future => {
160+
break response?;
161+
},
162+
// Report progress periodically
163+
_ = poll_interval.tick() => {
164+
let current_bytes = *bytes_sent.lock().unwrap();
165+
let percentage = if total_size > 0 {
166+
((current_bytes as f64 / total_size as f64) * 100.0) as u8
167+
} else {
168+
0
169+
};
170+
171+
// Only report when percentage changes to reduce events
172+
if percentage > last_percentage {
173+
if let Err(e) = progress_callback(Some(percentage), Some(current_bytes)) {
174+
return Err(Error::UploadError(e));
175+
}
176+
last_percentage = percentage;
177+
}
178+
}
179+
}
180+
};
181+
182+
// Report 100% completion
183+
progress_callback(Some(100), Some(total_size)).map_err(|e| Error::UploadError(e))?;
184+
185+
// Decode response
186+
let res: UploadResponse = response.json().await?;
187+
188+
// Check status
189+
if res.status == UploadResponseStatus::Error {
190+
return Err(Error::UploadError(res.message));
191+
}
192+
193+
// Extract url
194+
let nip94_event = res.nip94_event.ok_or(Error::ResponseDecodeError)?;
195+
match nip94_event.tags.find_standardized(TagKind::Url) {
196+
Some(TagStandard::Url(url)) => Ok(url.clone()),
197+
_ => Err(Error::ResponseDecodeError),
198+
}
199+
}

0 commit comments

Comments
 (0)