Skip to content

fix: realtime late join #6869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions src/mimefactory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::io::Cursor;

use anyhow::{Context as _, Result, bail, ensure};
use base64::Engine as _;
use data_encoding::BASE32_NOPAD;
use deltachat_contact_tools::sanitize_bidi_characters;
use iroh_gossip::proto::TopicId;
use mail_builder::headers::HeaderType;
use mail_builder::headers::address::{Address, EmailAddress};
use mail_builder::mime::MimePart;
Expand All @@ -21,14 +23,14 @@ use crate::contact::{Contact, ContactId, Origin};
use crate::context::Context;
use crate::e2ee::EncryptHelper;
use crate::ephemeral::Timer as EphemeralTimer;
use crate::key::self_fingerprint;
use crate::key::{DcKey, SignedPublicKey};
use crate::headerdef::HeaderDef;
use crate::key::{DcKey, SignedPublicKey, self_fingerprint};
use crate::location;
use crate::log::{info, warn};
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::{SystemMessage, is_hidden};
use crate::param::Param;
use crate::peer_channels::create_iroh_header;
use crate::peer_channels::{create_iroh_header, get_iroh_topic_for_msg};
use crate::simplify::escape_message_footer_marks;
use crate::stock_str;
use crate::tools::{
Expand Down Expand Up @@ -139,6 +141,9 @@ pub struct MimeFactory {

/// True if the avatar should be attached.
pub attach_selfavatar: bool,

/// Sustain webxdc topic on resend.
webxdc_topic: Option<TopicId>,
}

/// Result of rendering a message, ready to be submitted to a send job.
Expand Down Expand Up @@ -449,7 +454,7 @@ impl MimeFactory {
member_timestamps.is_empty()
|| to.len() + past_members.len() == member_timestamps.len()
);

let webxdc_topic = get_iroh_topic_for_msg(context, msg.id).await?;
let factory = MimeFactory {
from_addr,
from_displayname,
Expand All @@ -469,6 +474,7 @@ impl MimeFactory {
last_added_location_id: None,
sync_ids_to_delete: None,
attach_selfavatar,
webxdc_topic,
};
Ok(factory)
}
Expand Down Expand Up @@ -516,6 +522,7 @@ impl MimeFactory {
last_added_location_id: None,
sync_ids_to_delete: None,
attach_selfavatar: false,
webxdc_topic: None,
};

Ok(res)
Expand Down Expand Up @@ -1492,7 +1499,7 @@ impl MimeFactory {
}
SystemMessage::IrohNodeAddr => {
headers.push((
"Iroh-Node-Addr",
HeaderDef::IrohNodeAddr.into(),
mail_builder::headers::text::Text::new(serde_json::to_string(
&context
.get_or_try_init_peer_channel()
Expand Down Expand Up @@ -1673,10 +1680,13 @@ impl MimeFactory {
let json = msg.param.get(Param::Arg).unwrap_or_default();
parts.push(context.build_status_update_part(json));
} else if msg.viewtype == Viewtype::Webxdc {
let topic = self
.webxdc_topic
.map(|top| BASE32_NOPAD.encode(top.as_bytes()).to_ascii_lowercase())
.unwrap_or(create_iroh_header(context, msg.id).await?);
headers.push((
"Iroh-Gossip-Topic",
mail_builder::headers::raw::Raw::new(create_iroh_header(context, msg.id).await?)
.into(),
HeaderDef::IrohGossipTopic.into(),
mail_builder::headers::raw::Raw::new(topic).into(),
));
if let (Some(json), _) = context
.render_webxdc_status_update_object(
Expand Down
147 changes: 119 additions & 28 deletions src/peer_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ use crate::mimeparser::SystemMessage;
const PUBLIC_KEY_LENGTH: usize = 32;
const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();

/// Store iroh peer channels for the context.
/// Store Iroh peer channels for the context.
#[derive(Debug)]
pub struct Iroh {
/// iroh router needed for iroh peer channels.
/// Iroh router needed for Iroh peer channels.
pub(crate) router: iroh::protocol::Router,

/// [Gossip] needed for iroh peer channels.
/// [Gossip] needed for Iroh peer channels.
pub(crate) gossip: Gossip,

/// Sequence numbers for gossip channels.
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Iroh {

info!(
ctx,
"IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
"IROH_REALTIME: Joining gossip {topic} with peers: {:?}", node_ids,
);

// Inform iroh of potentially new node addresses
Expand Down Expand Up @@ -138,17 +138,11 @@ impl Iroh {
Ok(Some(join_rx))
}

/// Add gossip peers to realtime channel if it is already active.
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
/// Add gossip peer to realtime channel if it is already active.
pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if self.iroh_channels.read().await.get(&topic).is_some() {
for peer in &peers {
self.router.endpoint().add_node_addr(peer.clone())?;
}

self.gossip.subscribe_with_opts(
topic,
JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
);
self.router.endpoint().add_node_addr(peer.clone())?;
self.gossip.subscribe(topic, vec![peer.node_id])?;
}
Ok(())
}
Expand Down Expand Up @@ -198,7 +192,7 @@ impl Iroh {
}

/// Leave the realtime channel for a given topic.
pub(crate) async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
pub async fn leave_realtime(&self, topic: TopicId) -> Result<()> {
if let Some(channel) = self.iroh_channels.write().await.remove(&topic) {
// Dropping the last GossipTopic results in quitting the topic.
// It is split into GossipReceiver and GossipSender.
Expand Down Expand Up @@ -316,10 +310,21 @@ impl Context {
}
}
}

pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if let Some(iroh) = &*self.iroh.read().await {
info!(
self,
"Adding (maybe existing) peer with id {} to {topic}", peer.node_id
);
iroh.maybe_add_gossip_peer(topic, peer).await?;
}
Ok(())
}
}

/// Cache a peers [NodeId] for one topic.
pub(crate) async fn iroh_add_peer_for_topic(
pub async fn iroh_add_peer_for_topic(
ctx: &Context,
msg_id: MsgId,
topic: TopicId,
Expand All @@ -336,6 +341,7 @@ pub(crate) async fn iroh_add_peer_for_topic(
}

/// Add gossip peer from `Iroh-Node-Addr` header to WebXDC message identified by `instance_id`.
/// This should not start iroh, because receiving a NodeAddr does not mean you want to participate.
pub async fn add_gossip_peer_from_header(
context: &Context,
instance_id: MsgId,
Expand All @@ -348,12 +354,13 @@ pub async fn add_gossip_peer_from_header(
return Ok(());
}

let node_addr =
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;

info!(
context,
"Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
"Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
);
let node_addr =
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;

context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
msg_id: instance_id,
Expand All @@ -371,13 +378,12 @@ pub async fn add_gossip_peer_from_header(
let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;

let iroh = context.get_or_try_init_peer_channel().await?;
iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
context.maybe_add_gossip_peer(topic, node_addr).await?;
Ok(())
}

/// Insert topicId into the database so that we can use it to retrieve the topic.
pub(crate) async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
pub async fn insert_topic_stub(ctx: &Context, msg_id: MsgId, topic: TopicId) -> Result<()> {
ctx.sql
.execute(
"INSERT OR REPLACE INTO iroh_gossip_peers (msg_id, public_key, topic, relay_server) VALUES (?, ?, ?, ?)",
Expand Down Expand Up @@ -555,10 +561,12 @@ mod tests {
use super::*;
use crate::{
EventType,
chat::send_msg,
chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg},
message::{Message, Viewtype},
test_utils::TestContextManager,
test_utils::{TestContext, TestContextManager},
};
use std::time::Duration;
use tokio::time::timeout;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_can_communicate() {
Expand Down Expand Up @@ -924,8 +932,30 @@ mod tests {
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;

let chat = alice.create_chat(bob).await.id;

let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
alice,
"minimal.xdc",
include_bytes!("../test-data/webxdc/minimal.xdc"),
None,
)
.unwrap();
connect_alice_bob(alice, bob, chat, &mut instance).await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_webxdc_resend() {
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat")
.await
.unwrap();

// Alice sends webxdc to bob
let alice_chat = alice.create_chat(bob).await;
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
Expand All @@ -935,7 +965,66 @@ mod tests {
None,
)
.unwrap();
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();

add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
.await
.unwrap();

connect_alice_bob(alice, bob, group, &mut instance).await;

// fiona joins late
let fiona = &mut tcm.fiona().await;

add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
.await
.unwrap();

resend_msgs(alice, &[instance.id]).await.unwrap();
let msg = alice.pop_sent_msg().await;
let fiona_instance = fiona.recv_msg(&msg).await;
fiona_instance.chat_id.accept(fiona).await.unwrap();

let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
.await
.unwrap()
.unwrap();
let fiona_advert = fiona.pop_sent_msg().await;
alice.recv_msg_trash(&fiona_advert).await;

timeout(Duration::from_secs(2), fiona_connect_future)
.await
.unwrap()
.unwrap();
send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
.await
.unwrap();

timeout(Duration::from_secs(2), async {
loop {
let event = fiona.evtracker.recv().await.unwrap();
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
if data == b"alice -> bob & fiona" {
break;
} else {
panic!(
"Unexpected status update: {}",
String::from_utf8_lossy(&data)
);
}
}
}
})
.await
.unwrap();
}

async fn connect_alice_bob(
alice: &mut TestContext,
bob: &mut TestContext,
chat: ChatId,
instance: &mut Message,
) {
send_msg(alice, chat, instance).await.unwrap();
let alice_webxdc = alice.get_last_msg().await;

let webxdc = alice.pop_sent_msg().await;
Expand All @@ -952,17 +1041,19 @@ mod tests {
.unwrap();
let alice_advertisement = alice.pop_sent_msg().await;

send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
.await
.unwrap()
.unwrap();
let bob_advertisement = bob.pop_sent_msg().await;

eprintln!("Receiving advertisements");
bob.recv_msg_trash(&alice_advertisement).await;
alice.recv_msg_trash(&bob_advertisement).await;

eprintln!("Alice waits for connection");
eprintln!("Alice and Bob wait for connection");
alice_advertisement_future.await.unwrap();
bob_advertisement_future.await.unwrap();

// Alice sends ephemeral message
eprintln!("Sending ephemeral message");
Expand Down
2 changes: 1 addition & 1 deletion src/receive_imf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2141,7 +2141,7 @@ RETURNING id
created_db_entries.push(row_id);
}

// check all parts whether they contain a new logging webxdc
// Maybe set logging xdc and add gossip topics for webxdcs.
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
// check if any part contains a webxdc topic id
if part.typ == Viewtype::Webxdc {
Expand Down