Skip to content

Commit

Permalink
Merge pull request tursodatabase#1769 from tursodatabase/libsql-wal-r…
Browse files Browse the repository at this point in the history
…eplication-fixes

libsql-wal replication fixed
  • Loading branch information
MarinPostma authored Oct 3, 2024
2 parents 1c0da12 + db9d6e1 commit 87001c0
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 118 deletions.
25 changes: 13 additions & 12 deletions libsql-server/src/connection/connection_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::time::{Duration, Instant};
use libsql_sys::wal::{Wal, WalManager};
use metrics::histogram;
use parking_lot::Mutex;
use tokio::sync::watch;

use crate::connection::legacy::open_conn_active_checkpoint;
use crate::error::Error;
Expand All @@ -24,13 +23,15 @@ use crate::{Result, BLOCKING_RT};
use super::config::DatabaseConfig;
use super::program::{DescribeCol, DescribeParam, DescribeResponse, Program, Vm};

pub type GetCurrentFrameNo = Arc<dyn Fn() -> Option<FrameNo> + Send + Sync + 'static>;

/// The base connection type, shared between legacy and libsql-wal implementations
pub(super) struct CoreConnection<W> {
conn: libsql_sys::Connection<W>,
stats: Arc<Stats>,
config_store: MetaStoreHandle,
builder_config: QueryBuilderConfig,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
get_current_frame_no: GetCurrentFrameNo,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
forced_rollback: bool,
Expand Down Expand Up @@ -65,7 +66,7 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
broadcaster: BroadcasterHandle,
config_store: MetaStoreHandle,
builder_config: QueryBuilderConfig,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
get_current_frame_no: GetCurrentFrameNo,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
) -> Result<Self> {
Expand Down Expand Up @@ -118,13 +119,13 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
stats,
config_store,
builder_config,
current_frame_no_receiver,
block_writes,
resolve_attach_path,
forced_rollback: false,
broadcaster,
hooked: false,
canceled,
get_current_frame_no,
};

for ext in extensions.iter() {
Expand Down Expand Up @@ -265,9 +266,9 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
}

{
let mut lock = this.lock();
let lock = this.lock();
let is_autocommit = lock.conn.is_autocommit();
let current_fno = *lock.current_frame_no_receiver.borrow_and_update();
let current_fno = (lock.get_current_frame_no)();
vm.builder().finish(current_fno, is_autocommit)?;
}

Expand Down Expand Up @@ -424,13 +425,13 @@ mod test {
stats: Arc::new(Stats::default()),
config_store: MetaStoreHandle::new_test(),
builder_config: QueryBuilderConfig::default(),
current_frame_no_receiver: watch::channel(None).1,
block_writes: Default::default(),
resolve_attach_path: Arc::new(|_| unreachable!()),
forced_rollback: false,
broadcaster: Default::default(),
hooked: false,
canceled: Arc::new(false.into()),
get_current_frame_no: Arc::new(|| None),
};

let conn = Arc::new(Mutex::new(conn));
Expand Down Expand Up @@ -465,7 +466,7 @@ mod test {
100000000,
100000000,
DEFAULT_AUTO_CHECKPOINT,
watch::channel(None).1,
Arc::new(|| None),
None,
Default::default(),
Arc::new(|_| unreachable!()),
Expand Down Expand Up @@ -511,7 +512,7 @@ mod test {
100000000,
100000000,
DEFAULT_AUTO_CHECKPOINT,
watch::channel(None).1,
Arc::new(|| None),
None,
Default::default(),
Arc::new(|_| unreachable!()),
Expand Down Expand Up @@ -562,7 +563,7 @@ mod test {
100000000,
100000000,
DEFAULT_AUTO_CHECKPOINT,
watch::channel(None).1,
Arc::new(|| None),
None,
Default::default(),
Arc::new(|_| unreachable!()),
Expand Down Expand Up @@ -645,7 +646,7 @@ mod test {
100000000,
100000000,
DEFAULT_AUTO_CHECKPOINT,
watch::channel(None).1,
Arc::new(|| None),
None,
Default::default(),
Arc::new(|_| unreachable!()),
Expand Down Expand Up @@ -738,7 +739,7 @@ mod test {
100000000,
100000000,
DEFAULT_AUTO_CHECKPOINT,
watch::channel(None).1,
Arc::new(|| None),
None,
Default::default(),
Arc::new(|_| unreachable!()),
Expand Down
15 changes: 7 additions & 8 deletions libsql-server/src/connection/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use libsql_sys::EncryptionConfig;
use parking_lot::Mutex;
use rusqlite::ffi::SQLITE_BUSY;
use rusqlite::{ErrorCode, OpenFlags};
use tokio::sync::watch;
use tokio::time::Duration;

use crate::error::Error;
Expand All @@ -22,7 +21,7 @@ use crate::replication::FrameNo;
use crate::stats::Stats;
use crate::{record_time, Result};

use super::connection_core::CoreConnection;
use super::connection_core::{CoreConnection, GetCurrentFrameNo};

use super::connection_manager::{
ConnectionManager, InnerWalManager, ManagedConnectionWal, ManagedConnectionWalWrapper,
Expand All @@ -40,7 +39,7 @@ pub struct MakeLegacyConnection<W> {
max_response_size: u64,
max_total_response_size: u64,
auto_checkpoint: u32,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
get_current_frame_no: GetCurrentFrameNo,
connection_manager: ConnectionManager,
/// return sqlite busy. To mitigate that, we hold on to one connection
_db: Option<LegacyConnection<W>>,
Expand All @@ -65,7 +64,7 @@ where
max_response_size: u64,
max_total_response_size: u64,
auto_checkpoint: u32,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
current_frame_no: GetCurrentFrameNo,
encryption_config: Option<EncryptionConfig>,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
Expand All @@ -82,7 +81,7 @@ where
max_response_size,
max_total_response_size,
auto_checkpoint,
current_frame_no_receiver,
get_current_frame_no: current_frame_no,
_db: None,
wal_wrapper,
encryption_config,
Expand Down Expand Up @@ -142,7 +141,7 @@ where
auto_checkpoint: self.auto_checkpoint,
encryption_config: self.encryption_config.clone(),
},
self.current_frame_no_receiver.clone(),
self.get_current_frame_no.clone(),
self.block_writes.clone(),
self.resolve_attach_path.clone(),
self.connection_manager.clone(),
Expand Down Expand Up @@ -185,7 +184,7 @@ impl LegacyConnection<libsql_sys::wal::wrapper::PassthroughWalWrapper> {
Default::default(),
MetaStoreHandle::new_test(),
QueryBuilderConfig::default(),
tokio::sync::watch::channel(None).1,
Arc::new(|| None),
Default::default(),
Arc::new(|_| unreachable!()),
ConnectionManager::new(TXN_TIMEOUT),
Expand Down Expand Up @@ -321,7 +320,7 @@ where
broadcaster: BroadcasterHandle,
config_store: MetaStoreHandle,
builder_config: QueryBuilderConfig,
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
current_frame_no_receiver: GetCurrentFrameNo,
block_writes: Arc<AtomicBool>,
resolve_attach_path: ResolveNamespacePathFn,
connection_manager: ConnectionManager,
Expand Down
7 changes: 3 additions & 4 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use libsql_sys::EncryptionConfig;
use libsql_wal::io::StdIO;
use libsql_wal::wal::{LibsqlWal, LibsqlWalManager};
use parking_lot::Mutex;
use tokio::sync::watch;

use crate::connection::program::check_program_auth;
use crate::metrics::DESCRIBE_COUNT;
Expand All @@ -19,7 +18,7 @@ use crate::stats::Stats;
use crate::Result;
use crate::{record_time, SqldStorage, BLOCKING_RT};

use super::connection_core::CoreConnection;
use super::connection_core::{CoreConnection, GetCurrentFrameNo};
use super::program::{check_describe_auth, DescribeResponse, Program};
use super::{MakeConnection, RequestContext};

Expand All @@ -36,7 +35,7 @@ pub struct MakeLibsqlConnectionInner {
pub(crate) max_response_size: u64,
pub(crate) max_total_response_size: u64,
pub(crate) auto_checkpoint: u32,
pub(crate) current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
pub(crate) get_current_frame_no: GetCurrentFrameNo,
pub(crate) encryption_config: Option<EncryptionConfig>,
pub(crate) block_writes: Arc<AtomicBool>,
pub(crate) resolve_attach_path: ResolveNamespacePathFn,
Expand Down Expand Up @@ -67,7 +66,7 @@ impl MakeConnection for MakeLibsqlConnection {
inner.broadcaster.clone(),
inner.config_store.clone(),
builder_config,
inner.current_frame_no_receiver.clone(),
inner.get_current_frame_no.clone(),
inner.block_writes.clone(),
inner.resolve_attach_path.clone(),
)
Expand Down
43 changes: 24 additions & 19 deletions libsql-server/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use futures::Future;
use futures_core::future::BoxFuture;
use futures_core::Stream;
use libsql_replication::rpc::proxy::proxy_client::ProxyClient;
Expand All @@ -9,7 +11,7 @@ use libsql_replication::rpc::proxy::{
};
use libsql_sys::EncryptionConfig;
use parking_lot::Mutex as PMutex;
use tokio::sync::{mpsc, watch, Mutex};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tonic::{Code, Request, Streaming};
Expand All @@ -23,16 +25,21 @@ use crate::replication::FrameNo;
use crate::stats::Stats;
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};

use super::connection_core::GetCurrentFrameNo;
use super::program::DescribeResponse;
use super::{Connection, RequestContext};
use super::{MakeConnection, Program};

pub type RpcStream = Streaming<ExecResp>;
pub type WaitForFrameNo = Arc<
dyn Fn(FrameNo) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,
>;

pub struct MakeWriteProxyConn<M> {
client: ProxyClient<Channel>,
stats: Arc<Stats>,
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
wait_for_frame_no: WaitForFrameNo,
get_current_frame_no: GetCurrentFrameNo,
max_response_size: u64,
max_total_response_size: u64,
primary_replication_index: Option<FrameNo>,
Expand All @@ -47,23 +54,25 @@ impl<M> MakeWriteProxyConn<M> {
channel: Channel,
uri: tonic::transport::Uri,
stats: Arc<Stats>,
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
wait_for_frame_no: WaitForFrameNo,
max_response_size: u64,
max_total_response_size: u64,
primary_replication_index: Option<FrameNo>,
encryption_config: Option<EncryptionConfig>,
make_read_only_conn: M,
get_current_frame_no: GetCurrentFrameNo,
) -> Self {
let client = ProxyClient::with_origin(channel, uri);
Self {
client,
stats,
applied_frame_no_receiver,
wait_for_frame_no,
max_response_size,
max_total_response_size,
make_read_only_conn,
primary_replication_index,
encryption_config,
get_current_frame_no,
}
}
}
Expand All @@ -78,14 +87,15 @@ where
Ok(WriteProxyConnection::new(
self.client.clone(),
self.stats.clone(),
self.applied_frame_no_receiver.clone(),
self.wait_for_frame_no.clone(),
QueryBuilderConfig {
max_size: Some(self.max_response_size),
max_total_size: Some(self.max_total_response_size),
auto_checkpoint: DEFAULT_AUTO_CHECKPOINT,
encryption_config: self.encryption_config.clone(),
},
self.primary_replication_index,
self.get_current_frame_no.clone(),
self.make_read_only_conn.create().await?,
)?)
}
Expand All @@ -100,8 +110,9 @@ pub struct WriteProxyConnection<R, C> {
/// any subsequent read on this connection must wait for the replicator to catch up with this
/// frame_no
last_write_frame_no: PMutex<Option<FrameNo>>,
/// Notifier from the repliator of the currently applied frameno
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
/// Notifier from the replicator of the currently applied frame_no
wait_for_frame_no: WaitForFrameNo,
get_current_frame_no: GetCurrentFrameNo,
builder_config: QueryBuilderConfig,
stats: Arc<Stats>,

Expand All @@ -115,21 +126,23 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
fn new(
write_proxy: ProxyClient<Channel>,
stats: Arc<Stats>,
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
wait_for_frame_no: WaitForFrameNo,
builder_config: QueryBuilderConfig,
primary_replication_index: Option<u64>,
get_current_frame_no: GetCurrentFrameNo,
read_conn: C,
) -> Result<Self> {
Ok(Self {
read_conn,
write_proxy,
state: Mutex::new(TxnStatus::Init),
last_write_frame_no: Default::default(),
applied_frame_no_receiver,
wait_for_frame_no,
builder_config,
stats,
remote_conn: Default::default(),
primary_replication_index,
get_current_frame_no,
})
}

Expand Down Expand Up @@ -200,15 +213,7 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
let current_fno = replication_index.or_else(|| *self.last_write_frame_no.lock());
match current_fno {
Some(current_frame_no) => {
let mut receiver = self.applied_frame_no_receiver.clone();
receiver
.wait_for(|last_applied| match last_applied {
Some(x) => *x >= current_frame_no,
None => true,
})
.await
.map_err(|_| Error::ReplicatorExited)?;

(self.wait_for_frame_no)(current_frame_no).await;
Ok(())
}
None => Ok(()),
Expand All @@ -220,7 +225,7 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
fn should_proxy(&self) -> bool {
// There primary has data
if let Some(primary_index) = self.primary_replication_index {
let last_applied = *self.applied_frame_no_receiver.borrow();
let last_applied = (self.get_current_frame_no)();
// if we either don't have data while the primary has, or the data we have is
// anterior to that of the primary when we loaded the namespace, then proxy the
// request to the primary
Expand Down
3 changes: 2 additions & 1 deletion libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1008,12 +1008,13 @@ where
let make_replication_svc = Box::new({
let registry = registry.clone();
let disable_namespaces = self.disable_namespaces;
move |store, user_auth, _, _, _| -> BoxReplicationService {
move |store, user_auth, _, _, service_internal| -> BoxReplicationService {
Box::new(LibsqlReplicationService::new(
registry.clone(),
store,
user_auth,
disable_namespaces,
service_internal,
))
}
});
Expand Down
Loading

0 comments on commit 87001c0

Please sign in to comment.