Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 7c7b1cf

Browse files
committed
fix tests
1 parent 2ef424c commit 7c7b1cf

14 files changed

+463
-802
lines changed

sqld/src/error.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,32 @@ impl From<bincode::Error> for Error {
147147
}
148148
}
149149

150+
macro_rules! internal_from {
151+
($to:ty => { $($from:ty,)* }) => {
152+
$(
153+
impl From<$from> for $to {
154+
fn from(v: $from) -> Self {
155+
<$to>::Internal(v.to_string())
156+
}
157+
}
158+
)*
159+
160+
};
161+
}
162+
163+
internal_from! {
164+
LoadDumpError => {
165+
std::io::Error,
166+
rusqlite::Error,
167+
hyper::Error,
168+
tokio::task::JoinError,
169+
}
170+
}
171+
150172
#[derive(Debug, thiserror::Error)]
151173
pub enum LoadDumpError {
152-
#[error("IO error: {0}")]
153-
Io(#[from] std::io::Error),
174+
#[error("internal error: {0}")]
175+
Internal(String),
154176
#[error("Cannot load a dump on a replica")]
155177
ReplicaLoadDump,
156178
#[error("cannot load from a dump if a database already exists")]
@@ -161,10 +183,12 @@ pub enum LoadDumpError {
161183
DumpFileDoesntExist,
162184
#[error("invalid dump url")]
163185
InvalidDumpUrl,
164-
#[error("error fetching dump: {0}")]
165-
Fetch(#[from] hyper::Error),
166186
#[error("unsupported dump url scheme `{0}`, supported schemes are: `http`, `file`")]
167187
UnsupportedUrlScheme(String),
188+
#[error("a dump should execute within a transaction.")]
189+
NoTxn,
190+
#[error("the dump should commit the transaction.")]
191+
NoCommit,
168192
}
169193

170194
impl ResponseError for LoadDumpError {}
@@ -174,12 +198,14 @@ impl IntoResponse for LoadDumpError {
174198
use LoadDumpError::*;
175199

176200
match &self {
177-
Io(_) | Fetch(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
201+
Internal(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
178202
ReplicaLoadDump
179203
| LoadDumpExistingDb
180204
| InvalidDumpUrl
181205
| DumpFileDoesntExist
182206
| UnsupportedUrlScheme(_)
207+
| NoTxn
208+
| NoCommit
183209
| DumpFilePathNotAbsolute => self.format_err(StatusCode::BAD_REQUEST),
184210
}
185211
}

sqld/src/namespace/mod.rs

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@ use std::fmt;
44
use std::path::{Path, PathBuf};
55
use std::sync::{Arc, Weak};
66

7-
use anyhow::{bail, Context as _, ensure};
7+
use anyhow::Context as _;
88
use async_lock::{RwLock, RwLockUpgradableReadGuard};
99
use bottomless::replicator::Options;
1010
use bytes::Bytes;
1111
use chrono::NaiveDateTime;
1212
use enclose::enclose;
1313
use futures_core::Stream;
1414
use hyper::Uri;
15+
use parking_lot::Mutex;
1516
use rusqlite::ErrorCode;
1617
use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS;
1718
use tokio::io::AsyncBufReadExt;
1819
use tokio::sync::watch;
19-
use tokio::task::{block_in_place, JoinSet};
20+
use tokio::task::JoinSet;
2021
use tokio::time::Duration;
2122
use tokio_util::io::StreamReader;
2223
use tonic::transport::Channel;
@@ -76,7 +77,8 @@ impl NamespaceName {
7677
}
7778

7879
pub fn as_str(&self) -> &str {
79-
std::str::from_utf8(&self.0).unwrap()
80+
// Safety: the namespace is always valid UTF8
81+
unsafe { std::str::from_utf8_unchecked(&self.0) }
8082
}
8183

8284
pub fn from_bytes(bytes: Bytes) -> crate::Result<Self> {
@@ -640,6 +642,25 @@ impl Namespace<PrimaryDatabase> {
640642
name: NamespaceName,
641643
restore_option: RestoreOption,
642644
allow_creation: bool,
645+
) -> crate::Result<Self> {
646+
// FIXME: make that truly atomic. explore the idea of using temp directories, and it's implications
647+
match Self::try_new_primary(config, name.clone(), restore_option, allow_creation).await {
648+
Ok(ns) => Ok(ns),
649+
Err(e) => {
650+
let path = config.base_path.join("dbs").join(name.as_str());
651+
if let Err(e) = tokio::fs::remove_dir_all(path).await {
652+
tracing::error!("failed to clean dirty namespace: {e}");
653+
}
654+
Err(e)
655+
}
656+
}
657+
}
658+
659+
async fn try_new_primary(
660+
config: &PrimaryNamespaceConfig,
661+
name: NamespaceName,
662+
restore_option: RestoreOption,
663+
allow_creation: bool,
643664
) -> crate::Result<Self> {
644665
// if namespaces are disabled, then we allow creation for the default namespace.
645666
let allow_creation =
@@ -842,22 +863,20 @@ async fn load_dump<S>(
842863
dump: S,
843864
mk_ctx: impl Fn() -> ReplicationLoggerHookCtx,
844865
auto_checkpoint: u32,
845-
) -> anyhow::Result<()>
866+
) -> crate::Result<(), LoadDumpError>
846867
where
847868
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
848869
{
849870
let mut retries = 0;
850871
// there is a small chance we fail to acquire the lock right away, so we perform a few retries
851872
let conn = loop {
852-
match block_in_place(|| {
853-
open_conn(
854-
db_path,
855-
&REPLICATION_METHODS,
856-
mk_ctx(),
857-
None,
858-
auto_checkpoint,
859-
)
860-
}) {
873+
let ctx = mk_ctx();
874+
let db_path = db_path.to_path_buf();
875+
match tokio::task::spawn_blocking(move || {
876+
open_conn(&db_path, &REPLICATION_METHODS, ctx, None, auto_checkpoint)
877+
})
878+
.await?
879+
{
861880
Ok(conn) => {
862881
break conn;
863882
}
@@ -874,20 +893,19 @@ where
874893
retries += 1;
875894
tokio::time::sleep(Duration::from_millis(100)).await;
876895
}
877-
Err(e) => {
878-
bail!(e);
879-
}
896+
Err(e) => Err(e)?,
880897
}
881898
};
882899

900+
let conn = Arc::new(Mutex::new(conn));
901+
883902
let mut reader = tokio::io::BufReader::new(StreamReader::new(dump));
884903
let mut curr = String::new();
885904
let mut line = String::new();
886905
let mut skipped_wasm_table = false;
887906
let mut n_stmt = 0;
888907

889908
while let Ok(n) = reader.read_line(&mut curr).await {
890-
891909
if n == 0 {
892910
break;
893911
}
@@ -911,20 +929,28 @@ where
911929

912930
if line.ends_with(';') {
913931
n_stmt += 1;
914-
if n_stmt > 2 {
915-
ensure!(!conn.is_autocommit(), "a dump should execute within a transaction.");
932+
// dump must be performd within a txn
933+
if n_stmt > 2 && conn.lock().is_autocommit() {
934+
return Err(LoadDumpError::NoTxn);
916935
}
917936

918-
block_in_place(|| conn.execute(&line, ()))?;
937+
line = tokio::task::spawn_blocking({
938+
let conn = conn.clone();
939+
move || -> crate::Result<String, LoadDumpError> {
940+
conn.lock().execute(&line, ())?;
941+
Ok(line)
942+
}
943+
})
944+
.await??;
919945
line.clear();
920946
} else {
921947
line.push(' ');
922948
}
923949
}
924950

925-
if !conn.is_autocommit() {
926-
let _ = conn.execute("rollback", ());
927-
bail!("the dump should commit the transaction.");
951+
if !conn.lock().is_autocommit() {
952+
let _ = conn.lock().execute("rollback", ());
953+
return Err(LoadDumpError::NoCommit);
928954
}
929955

930956
Ok(())

0 commit comments

Comments
 (0)