Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transition to a serial number for keeping clients in-sync #20

Merged
merged 1 commit into from
Feb 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 31 additions & 58 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use lazy_static::lazy_static;
use serde::Deserialize;
use uuid::Uuid;
use core::str;
use std::{collections::{HashSet, HashMap}, env, marker::PhantomData, sync::{Arc, atomic::{AtomicBool,Ordering}}, time::Instant};
use std::{collections::{HashSet, HashMap}, env, marker::PhantomData, sync::{Arc, atomic::{AtomicU64,Ordering}}, time::Instant};

use pwhash::bcrypt;

Expand All @@ -30,7 +30,6 @@ use libc::{mlockall, MCL_CURRENT, MCL_FUTURE, MCL_ONFAULT};
const THE_DATABASE: &str = "ratchet_db.redb";
// XXX: this has to be less than i64::MAX.
const AUTH_TIMEOUT_MINUTES: u64 = 30;
const MAX_POLL_RETRIES: usize = 3;

/// Wraps the tables to provide type protection based on the original declaration.
struct ReadWriteTable<'a, K, V, T>(TableDefinition<'a, K, V>, PhantomData<T>) where
Expand Down Expand Up @@ -121,13 +120,8 @@ lazy_static! {
Arc::new(&DB_KEY)
}
};
// rtp_notify_pollers waits indefinitely, but this is no good
// for using ratchet_pawl independent of a backend, so ignore
// this signal when no downstream is registered.
static ref ANY_PINS_REGISTERED: AtomicBool = AtomicBool::new(false);

// Always notify on backend failed state, until we signal some backend.
static ref ANY_PINS_FAILED: AtomicBool = AtomicBool::new(false);
// Recommend polling upon attach to subscribers.
static ref LONG_POLL_EPOCH: AtomicU64 = AtomicU64::new(1);
}
/// SAFETY: This must only be called once prior to operation of any
/// accessors or users of the Arc<PERM_DB_KEY>.
Expand Down Expand Up @@ -159,44 +153,16 @@ impl RatchetKeyed for RatchetUserEntry{
}
}

async fn rtp_notify_pollers() -> status::Custom<&'static str> {
let mut signalled = 0usize; // alternatives are like state hashing
let mut retries = 0usize;
while signalled == 0 && ANY_PINS_REGISTERED.load(Ordering::Relaxed) && retries < MAX_POLL_RETRIES {
{
let mut pins = RATCHET_POLL_PINS.lock().await;
while let Some(pin) = pins.pop() {
pin.send(true); // it's pub-sub, so it will leak;
// we don't know the subscriber is gone until after they fail to reconnect.
signalled += 1;
}
} // after iterating all the long pins
// ensure that we signal at least one poller, for each update
if signalled == 0 {
rocket::tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
retries += 1;
}

if retries >= MAX_POLL_RETRIES {
// TODO: This machinery is just begging to become a race condition,
// do not modify without refactoring into a struct.
ANY_PINS_REGISTERED.store(false, Ordering::Relaxed);
ANY_PINS_FAILED.store(true, Ordering::Relaxed);
return status::Custom(Status::ServiceUnavailable, "");
}

if signalled != 0 && ANY_PINS_FAILED.load(Ordering::Relaxed) {
ANY_PINS_FAILED.store(false, Ordering::Relaxed);
async fn rtp_notify_pollers() {
LONG_POLL_EPOCH.fetch_add(1, Ordering::Relaxed);
let mut pins = RATCHET_POLL_PINS.lock().await;
while let Some(pin) = pins.pop() {
pin.send(true); // it's pub-sub, so it will leak;
// we don't know the subscriber is gone until after they fail to reconnect.
}

if ANY_PINS_FAILED.load(Ordering::Relaxed) {
return status::Custom(Status::ServiceUnavailable, "");
}

return status::Custom(Status::Ok, "");
}


#[post("/rmuser", format = "multipart/form-data", data = "<username>")]
async fn rm_user(_admin: RatchetUser, username: Form<String>) -> status::Custom<&'static str> {
let mut users = RATCHET_USERS.lock().await;
Expand All @@ -212,7 +178,8 @@ async fn rm_user(_admin: RatchetUser, username: Form<String>) -> status::Custom<
},
None => (),
}
rtp_notify_pollers().await
rocket::tokio::spawn(rtp_notify_pollers());
status::Custom(Status::Ok, "")
},
None => status::Custom(Status::Gone, ""),
}
Expand All @@ -233,7 +200,8 @@ async fn add_user(_admin: RatchetUser, newuser: Form<RatchetUserEntry>) -> statu
users.insert(
newuser.username.clone(), new_entry
);
rtp_notify_pollers().await
rocket::tokio::spawn(rtp_notify_pollers());
status::Custom(Status::Ok, "")
} else {
// TODO: This doesn't exactly mean this anymore
status::Custom(Status::Conflict, "")
Expand Down Expand Up @@ -264,7 +232,8 @@ async fn edit_user(_admin: RatchetUser, edited: Form<RatchetUserEntry>) -> statu
},
None => (),
}
rtp_notify_pollers().await
rocket::tokio::spawn(rtp_notify_pollers());
status::Custom(Status::Ok, "")
} else {
// TODO: This doesn't exactly mean this anymore
status::Custom(Status::Gone, "")
Expand Down Expand Up @@ -308,7 +277,8 @@ async fn rm_dev(_admin: RatchetUser, network_id: Form<String>) -> status::Custom
match devs.remove(&*network_id) {
Some(dev) => {
RATCHET_DEVS_TABLE.rm(&dev).await.expect("Database error");
rtp_notify_pollers().await
rocket::tokio::spawn(rtp_notify_pollers());
status::Custom(Status::Ok, "")
},
None => status::Custom(Status::Gone, ""),
}
Expand All @@ -322,7 +292,8 @@ async fn add_dev(_admin: RatchetUser, newdev: Form<RatchetDevEntry>) -> status::
let new_dev = newdev.to_owned();
RATCHET_DEVS_TABLE.write(&new_dev).await.expect("Database error");
devs.insert(new_dev.network_id.clone(), new_dev);
rtp_notify_pollers().await
rocket::tokio::spawn(rtp_notify_pollers());
status::Custom(Status::Ok, "")
} else {
status::Custom(Status::Conflict, "")
}
Expand All @@ -337,7 +308,8 @@ async fn edit_dev(_admin: RatchetUser, edited: Form<RatchetDevEntry>) -> status:
let dev_update = edited.to_owned();
RATCHET_DEVS_TABLE.write(&dev_update).await.expect("Database error");
devs.insert(dev_update.network_id.clone(), dev_update);
rtp_notify_pollers().await
rocket::tokio::spawn(rtp_notify_pollers());
status::Custom(Status::Ok, "")
}
}

Expand Down Expand Up @@ -388,22 +360,23 @@ async fn api_dump_devs(_valid: RatchetApiKey) -> String {
)
}

#[get("/api/longpoll")]
async fn api_long_poll(_valid: RatchetApiKey) -> String {
// Initialization path
if !ANY_PINS_REGISTERED.load(Ordering::Relaxed) {
ANY_PINS_REGISTERED.store(true, Ordering::Relaxed);
return String::from("Update");
#[get("/api/longpoll?<serial>")]
async fn api_long_poll(_valid: RatchetApiKey, serial: Option<u64>) -> String {
// Serial number mismatch
let latest = LONG_POLL_EPOCH.load(Ordering::Relaxed);
if let Some(sn) = serial { // Option retains compatibility with legacy ratchet
if sn != latest { return String::from(format!("Update {}", latest)); } // output
}

// Normal path / waiting
let (tx, rx) = oneshot::channel();
{
let mut pins = RATCHET_POLL_PINS.lock().await;
pins.push(tx);
}
} // drop the pins

match rx.await {
Ok(_v) => String::from("Update"),
Ok(_v) => String::from(format!("Update {}", latest)),
Err(_) => String::from(""),
}
}
Expand Down