Skip to content

Commit

Permalink
fix(sync): added client.wait_for_start() and improve error handling…
Browse files Browse the repository at this point in the history
… in sync daemon (#495)

* fix(sync): improve error handling in daemon and add exponential backoff for server connection

- Added error logging in daemon before re-throwing errors
- Implemented exponential backoff when checking if the server is running
- Addresses issue ActivityWatch/aw-qt#105

* Apply suggestions from code review

* refactor: move wait_for_server() into client

* refactor: renamed wait_for_server to wait_for_start (same as in aw-client-python)
  • Loading branch information
ErikBjare authored Oct 17, 2024
1 parent 656f3c9 commit a0cdef9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 16 deletions.
4 changes: 4 additions & 0 deletions aw-client-rust/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ impl AwClient {
proxy_method!(delete_event, (), bucketname: &str, event_id: i64);
proxy_method!(get_event_count, i64, bucketname: &str);
proxy_method!(get_info, aw_models::Info,);

pub fn wait_for_start(&self) -> Result<(), Box<dyn Error>> {
self.client.wait_for_start()
}
}
36 changes: 36 additions & 0 deletions aw-client-rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{collections::HashMap, error::Error};

use chrono::{DateTime, Utc};
use serde_json::{json, Map};
use std::net::TcpStream;
use std::time::Duration;

pub use aw_models::{Bucket, BucketMetadata, Event};

Expand Down Expand Up @@ -221,4 +223,38 @@ impl AwClient {
let url = format!("{}/api/0/info", self.baseurl);
self.client.get(url).send().await?.json().await
}

// TODO: make async
pub fn wait_for_start(&self) -> Result<(), Box<dyn Error>> {
let socket_addrs = self.baseurl.socket_addrs(|| None)?;
let socket_addr = socket_addrs
.first()
.ok_or("Unable to resolve baseurl into socket address")?;

// Check if server is running with exponential backoff
let mut retry_delay = Duration::from_millis(100);
let max_wait = Duration::from_secs(10);
let mut total_wait = Duration::from_secs(0);

while total_wait < max_wait {
match TcpStream::connect_timeout(socket_addr, retry_delay) {
Ok(_) => break,
Err(_) => {
std::thread::sleep(retry_delay);
total_wait += retry_delay;
retry_delay *= 2;
}
}
}

if total_wait >= max_wait {
return Err(format!(
"Local server {} not running after 10 seconds of retrying",
socket_addr
)
.into());
}

Ok(())
}
}
21 changes: 15 additions & 6 deletions aw-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,23 @@ fn main() -> Result<(), Box<dyn Error>> {

fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
loop {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;
if let Err(e) = daemon_sync_cycle(client) {
error!("Error during sync cycle: {}", e);
// Re-throw the error
return Err(e);
}

info!("Sync pass done, sleeping for 5 minutes");

std::thread::sleep(std::time::Duration::from_secs(300));
}
}

fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
info!("Pulling from all hosts");
sync_wrapper::pull_all(client)?;

info!("Pushing local data");
sync_wrapper::push(client)?;

Ok(())
}
11 changes: 1 addition & 10 deletions aw-sync/src/sync_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::error::Error;
use std::fs;
use std::net::TcpStream;

use crate::sync::{sync_run, SyncMode, SyncSpec};
use aw_client_rust::blocking::AwClient;
Expand All @@ -14,15 +13,7 @@ pub fn pull_all(client: &AwClient) -> Result<(), Box<dyn Error>> {
}

pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
let socket_addrs = client.baseurl.socket_addrs(|| None)?;
let socket_addr = socket_addrs
.get(0)
.ok_or("Unable to resolve baseurl into socket address")?;

// Check if server is running
if TcpStream::connect(socket_addr).is_err() {
return Err(format!("Local server {} not running", &client.baseurl).into());
}
client.wait_for_start()?;

// Path to the sync folder
// Sync folder is structured ./{hostname}/{device_id}/test.db
Expand Down

0 comments on commit a0cdef9

Please sign in to comment.