Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 0a42e6e

Browse files
flush frames and confirm snapshot completion before calling SQLite checkpoint (#704)
* flush frames and confirm snapshot completion before calling SQLite checkpoint * make blocking rt multithreaded --------- Co-authored-by: ad hoc <postma.marin@protonmail.com>
1 parent 8fa0244 commit 0a42e6e

File tree

6 files changed

+110
-71
lines changed

6 files changed

+110
-71
lines changed

bottomless/src/backup.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl WalCopier {
5656
if let Some(wal) = self.wal.as_mut() {
5757
wal
5858
} else {
59-
return Err(anyhow!("WAL file not found: \"{:?}\"", self.wal_path));
59+
return Err(anyhow!("WAL file not found: `{}`", self.wal_path));
6060
}
6161
};
6262
let generation = if let Some(generation) = self.generation.load_full() {

bottomless/src/lib.rs

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,31 @@ pub extern "C" fn xCheckpoint(
314314
tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE");
315315
return ffi::SQLITE_OK;
316316
}
317+
318+
let ctx = get_replicator_context(wal);
319+
let last_known_frame = ctx.replicator.last_known_frame();
320+
ctx.replicator.request_flush();
321+
if last_known_frame == 0 {
322+
tracing::debug!("No committed changes in this generation, not snapshotting");
323+
ctx.replicator.skip_snapshot_for_current_generation();
324+
return ffi::SQLITE_OK;
325+
}
326+
if let Err(e) = block_on!(
327+
ctx.runtime,
328+
ctx.replicator.wait_until_committed(last_known_frame)
329+
) {
330+
tracing::error!(
331+
"Failed to finalize frame {} replication: {}",
332+
last_known_frame,
333+
e
334+
);
335+
return ffi::SQLITE_IOERR_WRITE;
336+
}
337+
if let Err(e) = block_on!(ctx.runtime, ctx.replicator.wait_until_snapshotted()) {
338+
tracing::error!("Failed to finalize snapshot replication: {}", e);
339+
return ffi::SQLITE_IOERR_WRITE;
340+
}
341+
317342
/* If there's no busy handler, let's provide a default one,
318343
** since we auto-upgrade the passive checkpoint
319344
*/
@@ -342,31 +367,19 @@ pub extern "C" fn xCheckpoint(
342367
return rc;
343368
}
344369

345-
let ctx = get_replicator_context(wal);
346-
let last_known_frame = ctx.replicator.last_known_frame();
347-
ctx.replicator.request_flush();
348-
if last_known_frame == 0 {
349-
tracing::debug!("No committed changes in this generation, not snapshotting");
350-
ctx.replicator.skip_snapshot_for_current_generation();
351-
return ffi::SQLITE_OK;
352-
}
353-
if let Err(e) = block_on!(
354-
ctx.runtime,
355-
ctx.replicator.wait_until_committed(last_known_frame)
356-
) {
357-
tracing::error!("Failed to finalize replication: {}", e);
358-
return ffi::SQLITE_IOERR_WRITE;
359-
}
360-
361-
let prev = ctx.replicator.new_generation();
370+
let _prev = ctx.replicator.new_generation();
362371
tracing::debug!("Snapshotting after checkpoint");
363-
let result = block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file(prev));
364-
if let Err(e) = result {
365-
tracing::error!(
366-
"Failed to snapshot the main db file during checkpoint: {}",
367-
e
368-
);
369-
return ffi::SQLITE_IOERR_WRITE;
372+
match block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file()) {
373+
Ok(_handle) => {
374+
tracing::trace!("got snapshot handle");
375+
}
376+
Err(e) => {
377+
tracing::error!(
378+
"Failed to snapshot the main db file during checkpoint: {}",
379+
e
380+
);
381+
return ffi::SQLITE_IOERR_WRITE;
382+
}
370383
}
371384
tracing::debug!("Checkpoint completed in {:?}", Instant::now() - start);
372385

@@ -417,7 +430,7 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 {
417430
match replicator.restore(None, None).await {
418431
Ok((replicator::RestoreAction::SnapshotMainDbFile, _)) => {
419432
replicator.new_generation();
420-
match replicator.snapshot_main_db_file(None).await {
433+
match replicator.snapshot_main_db_file().await {
421434
Ok(Some(h)) => {
422435
if let Err(e) = h.await {
423436
tracing::error!("Failed to join snapshot main db file task: {}", e);

bottomless/src/replicator.rs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -373,18 +373,28 @@ impl Replicator {
373373
self.last_sent_frame_no.load(Ordering::Acquire)
374374
}
375375

376-
pub async fn wait_until_snapshotted(&mut self, generation: Uuid) -> Result<()> {
377-
let res = self
378-
.snapshot_waiter
379-
.wait_for(|result| match result {
380-
Ok(Some(gen)) => *gen == generation,
381-
Ok(None) => false,
382-
Err(_) => true,
383-
})
384-
.await?;
385-
match res.deref() {
386-
Ok(_) => Ok(()),
387-
Err(e) => Err(anyhow!("Failed snapshot generation {}: {}", generation, e)),
376+
pub async fn wait_until_snapshotted(&mut self) -> Result<bool> {
377+
if let Ok(generation) = self.generation() {
378+
if !self.main_db_exists_and_not_empty().await {
379+
tracing::debug!("Not snapshotting, the main db file does not exist or is empty");
380+
let _ = self.snapshot_notifier.send(Ok(Some(generation)));
381+
return Ok(false);
382+
}
383+
tracing::debug!("waiting for generation snapshot {} to complete", generation);
384+
let res = self
385+
.snapshot_waiter
386+
.wait_for(|result| match result {
387+
Ok(Some(gen)) => *gen == generation,
388+
Ok(None) => false,
389+
Err(_) => true,
390+
})
391+
.await?;
392+
match res.deref() {
393+
Ok(_) => Ok(true),
394+
Err(e) => Err(anyhow!("Failed snapshot generation {}: {}", generation, e)),
395+
}
396+
} else {
397+
Ok(false)
388398
}
389399
}
390400

@@ -706,23 +716,18 @@ impl Replicator {
706716
// Sends the main database file to S3 - if -wal file is present, it's replicated
707717
// too - it means that the local file was detected to be newer than its remote
708718
// counterpart.
709-
pub async fn snapshot_main_db_file(
710-
&mut self,
711-
prev_generation: Option<Uuid>,
712-
) -> Result<Option<JoinHandle<()>>> {
719+
pub async fn snapshot_main_db_file(&mut self) -> Result<Option<JoinHandle<()>>> {
713720
if !self.main_db_exists_and_not_empty().await {
714-
tracing::debug!("Not snapshotting, the main db file does not exist or is empty");
715-
let _ = self.snapshot_notifier.send(Ok(prev_generation));
721+
let generation = self.generation()?;
722+
tracing::debug!(
723+
"Not snapshotting {}, the main db file does not exist or is empty",
724+
generation
725+
);
726+
let _ = self.snapshot_notifier.send(Ok(Some(generation)));
716727
return Ok(None);
717728
}
718729
let generation = self.generation()?;
719-
tracing::debug!("Snapshotting generation {}", generation);
720730
let start_ts = Instant::now();
721-
if let Some(prev) = prev_generation {
722-
tracing::debug!("waiting for previous generation {} to complete", prev);
723-
self.wait_until_snapshotted(prev).await?;
724-
}
725-
726731
let client = self.client.clone();
727732
let mut db_file = File::open(&self.db_path).await?;
728733
let change_counter = Self::read_change_counter(&mut db_file).await?;
@@ -749,6 +754,7 @@ impl Replicator {
749754
let snapshot_notifier = self.snapshot_notifier.clone();
750755
let compression = self.use_compression;
751756
let handle = tokio::spawn(async move {
757+
tracing::trace!("Start snapshotting generation {}", generation);
752758
let start = Instant::now();
753759
let body = match Self::maybe_compress_main_db_file(db_file, compression).await {
754760
Ok(file) => file,
@@ -788,7 +794,7 @@ impl Replicator {
788794
let _ = tokio::fs::remove_file(format!("db.{}", compression)).await;
789795
});
790796
let elapsed = Instant::now() - start_ts;
791-
tracing::debug!("Scheduled DB snapshot (took {:?})", elapsed);
797+
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);
792798

793799
Ok(Some(handle))
794800
}

sqld/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
6969
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;
7070

7171
pub(crate) static BLOCKING_RT: Lazy<Runtime> = Lazy::new(|| {
72-
tokio::runtime::Builder::new_current_thread()
72+
tokio::runtime::Builder::new_multi_thread()
7373
.max_blocking_threads(50_000)
74+
.enable_all()
7475
.build()
7576
.unwrap()
7677
});

sqld/src/namespace/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -922,7 +922,9 @@ pub async fn init_bottomless_replicator(
922922
match action {
923923
bottomless::replicator::RestoreAction::SnapshotMainDbFile => {
924924
replicator.new_generation();
925-
replicator.snapshot_main_db_file(None).await?;
925+
if let Some(_handle) = replicator.snapshot_main_db_file().await? {
926+
tracing::trace!("got snapshot handle after restore with generation upgrade");
927+
}
926928
// Restoration process only leaves the local WAL file if it was
927929
// detected to be newer than its remote counterpart.
928930
replicator.maybe_replicate_wal().await?

sqld/src/replication/primary/logger.rs

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,39 @@ unsafe impl WalHook for ReplicationLoggerHook {
203203
return SQLITE_BUSY;
204204
}
205205
}
206+
207+
#[allow(clippy::await_holding_lock)]
208+
// uncontended -> only gets called under a libSQL write lock
209+
{
210+
let ctx = Self::wal_extract_ctx(wal);
211+
let runtime = tokio::runtime::Handle::current();
212+
if let Some(replicator) = ctx.bottomless_replicator.as_mut() {
213+
let mut replicator = replicator.lock().unwrap();
214+
let last_known_frame = replicator.last_known_frame();
215+
replicator.request_flush();
216+
if last_known_frame == 0 {
217+
tracing::debug!("No comitted changes in this generation, not snapshotting");
218+
replicator.skip_snapshot_for_current_generation();
219+
return SQLITE_OK;
220+
}
221+
if let Err(e) = runtime.block_on(replicator.wait_until_committed(last_known_frame))
222+
{
223+
tracing::error!(
224+
"Failed to wait for S3 replicator to confirm {} frames backup: {}",
225+
last_known_frame,
226+
e
227+
);
228+
return SQLITE_IOERR_WRITE;
229+
}
230+
if let Err(e) = runtime.block_on(replicator.wait_until_snapshotted()) {
231+
tracing::error!(
232+
"Failed to wait for S3 replicator to confirm database snapshot backup: {}",
233+
e
234+
);
235+
return SQLITE_IOERR_WRITE;
236+
}
237+
}
238+
}
206239
let rc = unsafe {
207240
orig(
208241
wal,
@@ -229,25 +262,9 @@ unsafe impl WalHook for ReplicationLoggerHook {
229262
let runtime = tokio::runtime::Handle::current();
230263
if let Some(replicator) = ctx.bottomless_replicator.as_mut() {
231264
let mut replicator = replicator.lock().unwrap();
232-
let last_known_frame = replicator.last_known_frame();
233-
replicator.request_flush();
234-
if last_known_frame == 0 {
235-
tracing::debug!("No comitted changes in this generation, not snapshotting");
236-
replicator.skip_snapshot_for_current_generation();
237-
return SQLITE_OK;
238-
}
239-
if let Err(e) = runtime.block_on(replicator.wait_until_committed(last_known_frame))
240-
{
241-
tracing::error!(
242-
"Failed to wait for S3 replicator to confirm {} frames backup: {}",
243-
last_known_frame,
244-
e
245-
);
246-
return SQLITE_IOERR_WRITE;
247-
}
248-
let prev = replicator.new_generation();
265+
let _prev = replicator.new_generation();
249266
if let Err(e) =
250-
runtime.block_on(async move { replicator.snapshot_main_db_file(prev).await })
267+
runtime.block_on(async move { replicator.snapshot_main_db_file().await })
251268
{
252269
tracing::error!("Failed to snapshot the main db file during checkpoint: {e}");
253270
return SQLITE_IOERR_WRITE;

0 commit comments

Comments
 (0)