From 379fe7a2a3bc31d8532c45695283de7a5778ea17 Mon Sep 17 00:00:00 2001 From: James Newton Date: Fri, 6 Sep 2024 09:43:59 -0700 Subject: [PATCH] Only add to last_frames_synced if frames did change. Fixes an issue where a returned Replicated#frames_synced will continuously increment based off the replicator.frames_synced() even if no new frames have synced. --- libsql-server/tests/embedded_replica/mod.rs | 121 +++++++++++++++++++- libsql/src/replication/mod.rs | 23 +++- 2 files changed, 138 insertions(+), 6 deletions(-) diff --git a/libsql-server/tests/embedded_replica/mod.rs b/libsql-server/tests/embedded_replica/mod.rs index 0288c9a117..46fd659980 100644 --- a/libsql-server/tests/embedded_replica/mod.rs +++ b/libsql-server/tests/embedded_replica/mod.rs @@ -1359,7 +1359,7 @@ fn replicated_return() { let rep = db.sync().await.unwrap(); assert_eq!(rep.frame_no(), Some(4)); - assert_eq!(rep.frames_synced(), 3); + assert_eq!(rep.frames_synced(), 5); let mut row = conn.query("select count(*) from user", ()).await.unwrap(); let count = row.next().await.unwrap().unwrap().get::(0).unwrap(); @@ -1518,3 +1518,122 @@ fn replicate_auth() { sim.run().unwrap(); } + +#[test] +fn replicated_synced_frames_zero_when_no_data_synced() { + let tmp_embedded = tempdir().unwrap(); + let tmp_embedded_path = tmp_embedded.path().to_owned(); + + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + let tmp = tempdir().unwrap(); + + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + + init_tracing(); + sim.host("primary", move || { + let notify = notify_clone.clone(); + let path = tmp.path().to_path_buf(); + async move { + let make_server = || async { + TestServer { + path: path.clone().into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + auth_key: None, + }), + rpc_server_config: Some(RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), + tls_config: None, + }), + ..Default::default() + } + }; + + let server = make_server().await; + let shutdown = server.shutdown.clone(); + + let fut = async move { server.start_sim(8080).await }; + + tokio::pin!(fut); + + loop { + tokio::select! { + res = &mut fut => { + res.unwrap(); + break + } + _ = notify.notified() => { + shutdown.notify_waiters(); + }, + } + } + + drop(fut); + + tokio::fs::File::create(path.join("dbs").join("default").join(".sentinel")) + .await + .unwrap(); + + notify.notify_waiters(); + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + sim.client("client", async move { + let path = tmp_embedded_path.join("embedded"); + let db = Database::open_with_remote_sync_connector( + path.to_str().unwrap(), + "http://primary:8080", + "", + TurmoilConnector, + false, + None, + ) + .await?; + + let rep = db.sync().await.unwrap(); + assert_eq!(rep.frame_no(), None); + assert_eq!(rep.frames_synced(), 0); + + let conn = db.connect()?; + + conn.execute("CREATE TABLE user (id INTEGER)", ()) + .await + .unwrap(); + + let rep = db.sync().await.unwrap(); + assert_eq!(rep.frame_no(), Some(1)); + assert_eq!(rep.frames_synced(), 2); + + conn.execute("INSERT into user(id) values (randomblob(4096));", ()) + .await + .unwrap(); + + let rep = db.sync().await.unwrap(); + assert_eq!(rep.frame_no(), Some(4)); + assert_eq!(rep.frames_synced(), 3); + + let rep = db.sync().await.unwrap(); + assert_eq!(rep.frame_no(), Some(4)); + assert_eq!(rep.frames_synced(), 0); + + let rep = db.sync().await.unwrap(); + assert_eq!(rep.frame_no(), Some(4)); + assert_eq!(rep.frames_synced(), 0); + + Ok(()) + }); + + sim.run().unwrap(); +} diff --git a/libsql/src/replication/mod.rs b/libsql/src/replication/mod.rs index b7f5b36ccf..851f8f238f 100644 --- a/libsql/src/replication/mod.rs +++ b/libsql/src/replication/mod.rs @@ -256,13 +256,26 @@ impl EmbeddedReplicator { } } - let last_frames_synced = self.last_frames_synced.fetch_add( - replicator.frames_synced(), - std::sync::atomic::Ordering::Relaxed, - ); + let current_frames_synced = replicator.frames_synced(); + + let mut last_frames_synced = self + .last_frames_synced + .load(std::sync::atomic::Ordering::Relaxed); + + while current_frames_synced > last_frames_synced { + match self.last_frames_synced.compare_exchange( + last_frames_synced, + current_frames_synced, + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + ) { + Ok(_) => break, + Err(current_value) => last_frames_synced = current_value, + } + } let frames_synced = - ((replicator.frames_synced() as i64 - last_frames_synced as i64).abs()) as usize; + ((current_frames_synced as i64 - last_frames_synced as i64).abs()) as usize; let replicated = Replicated { frame_no: replicator.client_mut().committed_frame_no(),