Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 0f15a96

Browse files
committed
write proxy: expire old write delegation streams
Right now we don't have a mechanism for expiring write delegation streams that disconnected, but without sending a valid Disconnect messages. That unfortunately also means that we leak libSQL connections, which are kept in-memory indefinitely. That leaks memory as well as disk space, because connections also keep temporary tables in place. FIXME: we also need to keep information about expired streams for longer, in case a client tries to send a late message. Such a message should be rejected in order to avoid running it outside of transaction context.
1 parent 2346a2b commit 0f15a96

File tree

5 files changed

+82
-8
lines changed

5 files changed

+82
-8
lines changed

sqld/src/connection/libsql.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ where
143143
}
144144
}
145145

146-
#[derive(Clone)]
146+
#[derive(Clone, Debug)]
147147
pub struct LibSqlConnection {
148148
sender: crossbeam::channel::Sender<ExecCallback>,
149149
}

sqld/src/connection/mod.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::atomic::{AtomicUsize, Ordering};
1+
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22
use std::sync::Arc;
33
use tokio::time::Duration;
44

@@ -231,6 +231,14 @@ impl Drop for WaitersGuard<'_> {
231231
}
232232
}
233233

234+
fn now_millis() -> u64 {
235+
use std::time::{SystemTime, UNIX_EPOCH};
236+
SystemTime::now()
237+
.duration_since(UNIX_EPOCH)
238+
.unwrap()
239+
.as_millis() as u64
240+
}
241+
234242
#[async_trait::async_trait]
235243
impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
236244
type Connection = TrackedConnection<F::Connection>;
@@ -266,14 +274,28 @@ impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
266274
}
267275

268276
let inner = self.connection_maker.create().await?;
269-
Ok(TrackedConnection { permit, inner })
277+
Ok(TrackedConnection {
278+
permit,
279+
inner,
280+
atime: AtomicU64::new(now_millis()),
281+
})
270282
}
271283
}
272284

285+
#[derive(Debug)]
273286
pub struct TrackedConnection<DB> {
274287
inner: DB,
275288
#[allow(dead_code)] // just hold on to it
276289
permit: tokio::sync::OwnedSemaphorePermit,
290+
atime: AtomicU64,
291+
}
292+
293+
impl<DB> TrackedConnection<DB> {
294+
pub fn idle_time(&self) -> Duration {
295+
let now = now_millis();
296+
let atime = self.atime.load(Ordering::Relaxed);
297+
Duration::from_millis(now - atime)
298+
}
277299
}
278300

279301
#[async_trait::async_trait]
@@ -286,6 +308,7 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
286308
builder: B,
287309
replication_index: Option<FrameNo>,
288310
) -> crate::Result<(B, State)> {
311+
self.atime.store(now_millis(), Ordering::Relaxed);
289312
self.inner
290313
.execute_program(pgm, auth, builder, replication_index)
291314
.await
@@ -298,6 +321,7 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
298321
auth: Authenticated,
299322
replication_index: Option<FrameNo>,
300323
) -> crate::Result<DescribeResult> {
324+
self.atime.store(now_millis(), Ordering::Relaxed);
301325
self.inner.describe(sql, auth, replication_index).await
302326
}
303327

@@ -308,6 +332,7 @@ impl<DB: Connection> Connection for TrackedConnection<DB> {
308332

309333
#[inline]
310334
async fn checkpoint(&self) -> Result<()> {
335+
self.atime.store(now_millis(), Ordering::Relaxed);
311336
self.inner.checkpoint().await
312337
}
313338
}

sqld/src/lib.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,20 @@ where
480480
}
481481

482482
if let Some(config) = self.rpc_config.take() {
483+
let proxy_service =
484+
ProxyService::new(namespaces.clone(), None, self.disable_namespaces);
485+
// Garbage collect proxy clients every 30 seconds
486+
self.join_set.spawn({
487+
let clients = proxy_service.clients();
488+
async move {
489+
loop {
490+
tokio::time::sleep(Duration::from_secs(30)).await;
491+
rpc::proxy::garbage_collect(&mut *clients.write().await).await;
492+
}
493+
}
494+
});
483495
self.join_set.spawn(run_rpc_server(
496+
proxy_service,
484497
config.acceptor,
485498
config.tls_config,
486499
self.idle_shutdown_kicker.clone(),
@@ -498,7 +511,16 @@ where
498511

499512
let proxy_service =
500513
ProxyService::new(namespaces.clone(), Some(self.auth), self.disable_namespaces);
501-
514+
// Garbage collect proxy clients every 30 seconds
515+
self.join_set.spawn({
516+
let clients = proxy_service.clients();
517+
async move {
518+
loop {
519+
tokio::time::sleep(Duration::from_secs(30)).await;
520+
rpc::proxy::garbage_collect(&mut *clients.write().await).await;
521+
}
522+
}
523+
});
502524
Ok((namespaces, proxy_service, logger_service))
503525
}
504526
}

sqld/src/rpc/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,14 @@ pub mod replication_log_proxy;
2424
pub const NAMESPACE_DOESNT_EXIST: &str = "NAMESPACE_DOESNT_EXIST";
2525
pub(crate) const NAMESPACE_METADATA_KEY: &str = "x-namespace-bin";
2626

27-
#[allow(clippy::too_many_arguments)]
2827
pub async fn run_rpc_server<A: crate::net::Accept>(
28+
proxy_service: ProxyService,
2929
acceptor: A,
3030
maybe_tls: Option<TlsConfig>,
3131
idle_shutdown_layer: Option<IdleShutdownKicker>,
3232
namespaces: NamespaceStore<PrimaryNamespaceMaker>,
3333
disable_namespaces: bool,
3434
) -> anyhow::Result<()> {
35-
let proxy_service = ProxyService::new(namespaces.clone(), None, disable_namespaces);
3635
let logger_service = ReplicationLogService::new(
3736
namespaces.clone(),
3837
idle_shutdown_layer.clone(),

sqld/src/rpc/proxy.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ pub mod rpc {
265265
}
266266

267267
pub struct ProxyService {
268-
clients: RwLock<HashMap<Uuid, Arc<TrackedConnection<LibSqlConnection>>>>,
268+
clients: Arc<RwLock<HashMap<Uuid, Arc<TrackedConnection<LibSqlConnection>>>>>,
269269
namespaces: NamespaceStore<PrimaryNamespaceMaker>,
270270
auth: Option<Arc<Auth>>,
271271
disable_namespaces: bool,
@@ -277,13 +277,19 @@ impl ProxyService {
277277
auth: Option<Arc<Auth>>,
278278
disable_namespaces: bool,
279279
) -> Self {
280+
let clients: Arc<RwLock<HashMap<Uuid, Arc<TrackedConnection<LibSqlConnection>>>>> =
281+
Default::default();
280282
Self {
281-
clients: Default::default(),
283+
clients,
282284
namespaces,
283285
auth,
284286
disable_namespaces,
285287
}
286288
}
289+
290+
pub fn clients(&self) -> Arc<RwLock<HashMap<Uuid, Arc<TrackedConnection<LibSqlConnection>>>>> {
291+
self.clients.clone()
292+
}
287293
}
288294

289295
#[derive(Debug, Default)]
@@ -441,6 +447,28 @@ impl QueryResultBuilder for ExecuteResultBuilder {
441447
}
442448
}
443449

450+
// Disconnects all clients that have been idle for more than 30 seconds.
451+
// FIXME: we should also keep a list of recently disconnected clients,
452+
// and if one should arrive with a late message, it should be rejected
453+
// with an error. A similar mechanism is already implemented in hrana-over-http.
454+
pub async fn garbage_collect(
455+
clients: &mut HashMap<Uuid, Arc<TrackedConnection<LibSqlConnection>>>,
456+
) {
457+
let mut to_remove = Vec::new();
458+
let limit = std::time::Duration::from_secs(30);
459+
for (client_id, db) in clients.iter() {
460+
if db.idle_time() > limit {
461+
to_remove.push(*client_id);
462+
}
463+
}
464+
465+
tracing::trace!("garbage collecting clients: {to_remove:?}");
466+
for client_id in to_remove {
467+
clients.remove(&client_id);
468+
}
469+
tracing::trace!("remaining client handles: {:?}", clients);
470+
}
471+
444472
#[tonic::async_trait]
445473
impl Proxy for ProxyService {
446474
async fn execute(

0 commit comments

Comments
 (0)