Skip to content

Commit d6d055c

Browse files
committed
add background task
Signed-off-by: Shoham Elias <shohame@amazon.com>
1 parent d54baef commit d6d055c

File tree

7 files changed

+468
-7
lines changed

7 files changed

+468
-7
lines changed

glide-core/cache/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ moka = { version = "0.12", features = ["sync"] }
1212
dashmap = "6"
1313
logger_core = { path = "../../logger_core" }
1414
once_cell = "1"
15+
parking_lot = "0.12"
16+
tokio = { version = "1", features = ["rt", "time", "macros"] }

glide-core/cache/src/lib.rs

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
22
use dashmap::DashMap;
3-
use logger_core::log_info;
3+
use logger_core::{log_debug, log_info};
44
use moka::policy::EvictionPolicy as MokaEvictionPolicy;
55
use moka::sync::Cache;
66
use once_cell::sync::Lazy;
77
use redis::Value;
88
use std::sync::{Arc, Weak};
99
use std::time::Duration;
10+
use tokio::task::JoinHandle;
1011

1112
static CACHE_REGISTRY: Lazy<DashMap<String, Weak<Cache<Vec<u8>, Value>>>> =
1213
Lazy::new(|| DashMap::new());
1314

15+
static HOUSEKEEPING_HANDLE: Lazy<parking_lot::Mutex<Option<JoinHandle<()>>>> =
16+
Lazy::new(|| parking_lot::Mutex::new(None));
17+
1418
/// Cache eviction policy
1519
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1620
pub enum EvictionPolicy {
@@ -96,6 +100,9 @@ pub fn get_or_create_cache(
96100
// Store weak reference in registry
97101
CACHE_REGISTRY.insert(cache_id.to_string(), Arc::downgrade(&cache));
98102

103+
// Start housekeeping task if this is the first cache
104+
start_cache_housekeeping();
105+
99106
cache
100107
}
101108

@@ -174,14 +181,82 @@ fn calculate_value_additional_data(value: &Value) -> usize {
174181
/// Returns the total size of the cache entry (key + value) in bytes
175182
fn cache_entry_weigher(key: &Vec<u8>, value: &Value) -> u32 {
176183
let total_size = key.len() + calculate_value_size(value);
177-
println!(
178-
"Cache entry weigher: key_len = {}, value_size = {}",
179-
key.len(),
180-
calculate_value_size(value)
181-
);
182184
total_size.try_into().unwrap_or(u32::MAX)
183185
}
184186

187+
/// Periodically runs pending tasks for all live caches
188+
async fn periodic_cache_housekeeping(interval_duration: Duration) {
189+
log_info(
190+
"cache_housekeeping",
191+
format!(
192+
"Started cache housekeeping task (interval: {:?})",
193+
interval_duration
194+
),
195+
);
196+
197+
loop {
198+
tokio::time::sleep(interval_duration).await;
199+
200+
let mut live_count = 0;
201+
let mut dead_keys = Vec::new();
202+
203+
// Process all caches
204+
for entry in CACHE_REGISTRY.iter() {
205+
match entry.value().upgrade() {
206+
Some(cache) => {
207+
cache.run_pending_tasks();
208+
live_count += 1;
209+
}
210+
None => {
211+
// Cache is dead, mark for removal
212+
dead_keys.push(entry.key().clone());
213+
}
214+
}
215+
}
216+
217+
// Clean up dead cache entries
218+
for key in dead_keys {
219+
CACHE_REGISTRY.remove(&key);
220+
log_debug(
221+
"cache_housekeeping",
222+
format!("Removed dead cache entry: {}", key),
223+
);
224+
}
225+
226+
// If no live caches remain, stop the housekeeping task
227+
if live_count == 0 && CACHE_REGISTRY.is_empty() {
228+
log_info(
229+
"cache_housekeeping",
230+
"No live caches remaining, stopping housekeeping task",
231+
);
232+
break;
233+
}
234+
235+
log_debug(
236+
"cache_housekeeping",
237+
format!("Processed {} live caches", live_count),
238+
);
239+
}
240+
241+
log_info("cache_housekeeping", "Cache housekeeping task stopped");
242+
}
243+
244+
/// Start the cache housekeeping background task
245+
pub fn start_cache_housekeeping() {
246+
let mut handle_guard = HOUSEKEEPING_HANDLE.lock();
247+
248+
// Don't start if already running
249+
if handle_guard.is_some() {
250+
log_debug("cache_housekeeping", "Housekeeping task already running");
251+
return;
252+
}
253+
254+
let task = tokio::spawn(periodic_cache_housekeeping(Duration::from_millis(500)));
255+
*handle_guard = Some(task);
256+
257+
log_info("cache_housekeeping", "Started cache housekeeping task");
258+
}
259+
185260
#[cfg(test)]
186261
mod tests {
187262
use super::*;

python/glide-async/python/glide/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@
158158
VectorFieldAttributesHnsw,
159159
VectorType,
160160
json_batch,
161+
ClientSideCache,
162+
EvictionPolicy,
161163
)
162164

163165
from .async_commands import ft, glide_json
@@ -385,4 +387,7 @@ def __getattr__(self, name):
385387
"FtAggregateSortProperty",
386388
"FtProfileOptions",
387389
"QueryType",
390+
# Cache
391+
"ClientSideCache",
392+
"EvictionPolicy",
388393
]

python/glide-shared/glide_shared/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@
168168
SlotKeyRoute,
169169
SlotType,
170170
)
171+
from .cache import ClientSideCache, EvictionPolicy
171172

172173
__all__ = [
173174
# Client
@@ -327,4 +328,7 @@
327328
"FtAggregateSortProperty",
328329
"FtProfileOptions",
329330
"QueryType",
331+
# Cache
332+
"ClientSideCache",
333+
"EvictionPolicy",
330334
]

python/glide-shared/glide_shared/cache.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ def create(
7878
currently supported. This means cached values may become stale if updated on
7979
the server before the TTL expires.
8080
81+
**Note**: In order for 2 clients to share the same cache, they must be
82+
created with the same `ClientSideCache` instance.
83+
* Client's with different `ClientSideCache` instances will have separate caches,
84+
even if the configurations are identical.
85+
* Client's using different db's cannot share the same cache.
86+
* Client's using different ACL users cannot share the same cache.
87+
8188
Args:
8289
max_cache_kb (int): Maximum size of the cache in kilobytes (KB). This limits
8390
the total memory used by cached keys and values. When this limit is reached,

python/glide-shared/glide_shared/config.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,12 @@ class BaseClientConfiguration:
355355
If not set, connections are established immediately during client creation (equivalent to `False`).
356356
client_side_cache (Optional[ClientSideCache]): Configuration for client-side caching.
357357
See `ClientSideCache` for more information.
358+
**Note**: In order for 2 clients to share the same cache, they must be
359+
created with the same `ClientSideCache` instance.
360+
* Client's with different `ClientSideCache` instances will have separate caches,
361+
even if the configurations are identical.
362+
* Client's using different db's cannot share the same cache.
363+
* Client's using different ACL users cannot share the same cache.
358364
"""
359365

360366
def __init__(
@@ -600,6 +606,12 @@ class GlideClientConfiguration(BaseClientConfiguration):
600606
see `AdvancedGlideClientConfiguration`.
601607
client_side_cache (Optional[ClientSideCache]): Configuration for client-side caching.
602608
See `ClientSideCache` for more information.
609+
**Note**: In order for 2 clients to share the same cache, they must be
610+
created with the same `ClientSideCache` instance.
611+
* Client's with different `ClientSideCache` instances will have separate caches,
612+
even if the configurations are identical.
613+
* Client's using different db's cannot share the same cache.
614+
* Client's using different ACL users cannot share the same cache.
603615
"""
604616

605617
class PubSubChannelModes(IntEnum):
@@ -797,6 +809,12 @@ class GlideClusterClientConfiguration(BaseClientConfiguration):
797809
see `AdvancedGlideClusterClientConfiguration`.
798810
client_side_cache (Optional[ClientSideCache]): Configuration for client-side caching.
799811
See `ClientSideCache` for more information.
812+
**Note**: In order for 2 clients to share the same cache, they must be
813+
created with the same `ClientSideCache` instance.
814+
* Client's with different `ClientSideCache` instances will have separate caches,
815+
even if the configurations are identical.
816+
* Client's using different db's cannot share the same cache.
817+
* Client's using different ACL users cannot share the same cache.
800818
801819
Note:
802820
Currently, the reconnection strategy in cluster mode is not configurable, and exponential backoff

0 commit comments

Comments
 (0)