Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ rustls = { version = "0.23", features = ["aws-lc-rs"] }
rustls-pemfile = "2"
rustls-pki-types = "1"
telemetrylib = { path = "./telemetry" }
glidecachelib = { path = "./cache" }
tokio = { version = "1", features = ["macros", "time"] }
logger_core = { path = "../logger_core" }
tokio-util = { version = "^0.7", features = ["rt"], optional = true }
Expand Down Expand Up @@ -51,8 +52,9 @@ async-trait = { version = "0.1" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }
versions = "7"
strum = "0.26"
strum = "0.26"
strum_macros = "0.26"
moka = { version = "0.12", features = ["sync"] }

[features]
proto = ["protobuf"]
Expand All @@ -76,15 +78,18 @@ criterion = { version = "^0.6", features = ["html_reports", "async_tokio"] }
which = "8"
ctor = "0.5"
redis = { path = "./redis-rs/redis", features = ["tls-rustls-insecure"] }
rustls = { version = "0.23", features = ["aws-lc-rs"]}
rustls = { version = "0.23", features = ["aws-lc-rs"] }
iai-callgrind = "0.15"
tokio = { version = "1", features = ["rt-multi-thread"] }
glide-core = { path = ".", features = [
"socket-layer",
] } # always enable this feature in tests.

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(standalone_heartbeat)', 'cfg(feature, values("iam_tests"))'] }
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(standalone_heartbeat)',
'cfg(feature, values("iam_tests"))',
] }

[build-dependencies]
protobuf-codegen = "3"
Expand Down
16 changes: 16 additions & 0 deletions glide-core/cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "glidecachelib"
version = "0.1.0"
edition = "2024"
license = "Apache-2.0"
authors = ["Valkey GLIDE Maintainers"]

[dependencies]
lazy_static = "1"
redis = { path = "../redis-rs/redis", features = [] }
moka = { version = "0.12", features = ["sync"] }
dashmap = "6"
logger_core = { path = "../../logger_core" }
once_cell = "1"
parking_lot = "0.12"
tokio = { version = "1", features = ["rt", "time", "macros"] }
314 changes: 314 additions & 0 deletions glide-core/cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
use dashmap::DashMap;
use logger_core::{log_debug, log_info};
use moka::policy::EvictionPolicy as MokaEvictionPolicy;
use moka::sync::Cache;
use once_cell::sync::Lazy;
use redis::Value;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::task::JoinHandle;

static CACHE_REGISTRY: Lazy<DashMap<String, Weak<Cache<Vec<u8>, Value>>>> =
Lazy::new(|| DashMap::new());

static HOUSEKEEPING_HANDLE: Lazy<parking_lot::Mutex<Option<JoinHandle<()>>>> =
Lazy::new(|| parking_lot::Mutex::new(None));

/// Cache eviction policy
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EvictionPolicy {
/// Least Recently Used - Evicts the least recently accessed entry.
Lru,
/// TinyLFU (Frequency-based) - Combines frequency and recency based eviction.
/// When the cache is full, it only admits new entries if they're accessed more frequently than existing ones.
/// This prevents rarely-accessed or one-time keys from evicting popular entries. **Once admitted,
/// entries are evicted based on least recent access (LRU)**. Best for most workloads.
TinyLfu,
}

impl Default for EvictionPolicy {
fn default() -> Self {
Self::Lru
}
}

/// Gets a cache by ID.
/// Returns None if the cache doesn't exist or has been dropped.
pub fn get_cache(cache_id: &str) -> Option<Arc<Cache<Vec<u8>, Value>>> {
CACHE_REGISTRY.get(cache_id).and_then(|entry| {
match entry.upgrade() {
Some(cache) => {
log_info("cache_lifetime", format!("Retrieved cache `{cache_id}`"));
Some(cache)
}
None => {
// Cache was dropped, clean up the weak reference
drop(entry);
CACHE_REGISTRY.remove(cache_id);
log_info(
"cache_lifetime",
format!("Cache `{cache_id}` was already dropped"),
);
None
}
}
})
}

/// Creates (or retrieves) a cache with the given ID.
/// If the cache already exists, returns the existing one.
/// If it doesn't exist, creates a new one.
pub fn get_or_create_cache(
cache_id: &str,
max_cache_kb: u64,
ttl_sec: Option<u64>,
eviction_policy: Option<EvictionPolicy>,
) -> Arc<Cache<Vec<u8>, Value>> {
// First, try to get existing cache
if let Some(cache) = get_cache(cache_id) {
return cache;
}

// Create cache with weigher to measure actual byte size
let mut builder = Cache::builder()
.max_capacity(max_cache_kb * 1024) // Convert KB to bytes
.weigher(cache_entry_weigher);

if let Some(ttl) = ttl_sec {
builder = builder.time_to_live(Duration::from_secs(ttl));
}

// Configure eviction policy
let policy = eviction_policy.unwrap_or_default();
match policy {
EvictionPolicy::TinyLfu => {
builder = builder.eviction_policy(MokaEvictionPolicy::tiny_lfu());
}
EvictionPolicy::Lru => {
builder = builder.eviction_policy(MokaEvictionPolicy::lru());
}
}

let cache = Arc::new(builder.build());

log_info(
"cache_lifetime",
format!("Created cache with ID: `{cache_id}`"),
);

// Store weak reference in registry
CACHE_REGISTRY.insert(cache_id.to_string(), Arc::downgrade(&cache));

// Start housekeeping task if this is the first cache
start_cache_housekeeping();

cache
}

/// Calculates the total memory size of a Value in bytes.
/// This includes the enum overhead and all allocated data.
pub fn calculate_value_size(value: &Value) -> usize {
// Every Value has a base overhead for the enum discriminant and largest variant
let base_overhead = std::mem::size_of::<Value>();

// Plus any additional allocated data
let additional_data = match value {
Value::Nil | Value::Int(_) | Value::Double(_) | Value::Boolean(_) => 0,
Value::BulkString(data) => data.len(),
Value::Array(arr) => {
arr.len() * std::mem::size_of::<Value>()
+ arr
.iter()
.map(|v| calculate_value_additional_data(v))
.sum::<usize>()
}
Value::SimpleString(s) => s.len(),
Value::Okay => 0,
Value::Map(map) => {
map.len() * std::mem::size_of::<(Value, Value)>()
+ map
.iter()
.map(|(k, v)| {
calculate_value_additional_data(k) + calculate_value_additional_data(v)
})
.sum::<usize>()
}
Value::Attribute { data, attributes } => {
std::mem::size_of::<Value>() // boxed value overhead
+ calculate_value_additional_data(data)
+ attributes.len() * std::mem::size_of::<(Value, Value)>()
+ attributes
.iter()
.map(|(k, v)| calculate_value_additional_data(k) + calculate_value_additional_data(v))
.sum::<usize>()
}
Value::Set(set) => {
set.len() * std::mem::size_of::<Value>()
+ set
.iter()
.map(|v| calculate_value_additional_data(v))
.sum::<usize>()
}
Value::VerbatimString { format: _, text } => text.len(),
Value::BigNumber(big_int) => {
// BigInt allocates memory based on the number size
((big_int.bits() + 7) / 8) as usize // Convert bits to bytes
}
Value::Push { kind: _, data } => {
data.len() * std::mem::size_of::<Value>()
+ data
.iter()
.map(|v| calculate_value_additional_data(v))
.sum::<usize>()
}
Value::ServerError(err) => {
// Adjust based on ServerError's actual structure
std::mem::size_of_val(err)
}
};

base_overhead + additional_data
}

/// Helper function that calculates only the additional allocated data
/// (without the base enum overhead)
fn calculate_value_additional_data(value: &Value) -> usize {
calculate_value_size(value) - std::mem::size_of::<Value>()
}

/// Weigher function for Moka cache
/// Returns the total size of the cache entry (key + value) in bytes
fn cache_entry_weigher(key: &Vec<u8>, value: &Value) -> u32 {
let total_size = key.len() + calculate_value_size(value);
total_size.try_into().unwrap_or(u32::MAX)
}

/// Periodically runs pending tasks for all live caches
async fn periodic_cache_housekeeping(interval_duration: Duration) {
log_info(
"cache_housekeeping",
format!(
"Started cache housekeeping task (interval: {:?})",
interval_duration
),
);

loop {
tokio::time::sleep(interval_duration).await;

let mut live_count = 0;
let mut dead_keys = Vec::new();

// Process all caches
for entry in CACHE_REGISTRY.iter() {
match entry.value().upgrade() {
Some(cache) => {
cache.run_pending_tasks();
live_count += 1;
}
None => {
// Cache is dead, mark for removal
dead_keys.push(entry.key().clone());
}
}
}

// Clean up dead cache entries
for key in dead_keys {
CACHE_REGISTRY.remove(&key);
log_debug(
"cache_housekeeping",
format!("Removed dead cache entry: {}", key),
);
}

// If no live caches remain, stop the housekeeping task
if live_count == 0 && CACHE_REGISTRY.is_empty() {
log_info(
"cache_housekeeping",
"No live caches remaining, stopping housekeeping task",
);
break;
}

log_debug(
"cache_housekeeping",
format!("Processed {} live caches", live_count),
);
}

log_info("cache_housekeeping", "Cache housekeeping task stopped");
}

/// Start the cache housekeeping background task
pub fn start_cache_housekeeping() {
let mut handle_guard = HOUSEKEEPING_HANDLE.lock();

// Don't start if already running
if handle_guard.is_some() {
log_debug("cache_housekeeping", "Housekeeping task already running");
return;
}

let task = tokio::spawn(periodic_cache_housekeeping(Duration::from_millis(500)));
*handle_guard = Some(task);

log_info("cache_housekeeping", "Started cache housekeeping task");
}

#[cfg(test)]
mod tests {
use super::*;

// Helper to generate unique cache IDs for isolated tests
fn test_cache_id(suffix: &str) -> String {
format!(
"test-cache-{}-{}",
suffix,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
)
}

#[test]
fn test_get_or_create() {
let cache_id = "my-cache-123";
let cache1 = get_or_create_cache(cache_id, 100, Some(60), None);
let cache2 = get_or_create_cache(cache_id, 100, Some(60), None);

// Should be the same cache instance
assert!(Arc::ptr_eq(&cache1, &cache2));
}

#[test]
fn test_get_cache_nonexistent() {
let cache_id = test_cache_id("nonexistent");

let result = get_cache(&cache_id);
assert!(result.is_none());
}

#[test]
fn test_cache_weak_reference_cleanup() {
let cache_id = test_cache_id("weak-ref");

{
let _cache = get_or_create_cache(&cache_id, 100, None, None);
assert!(get_cache(&cache_id).is_some());
// cache is dropped here
}

// Registry still has the entry (weak reference)
assert!(CACHE_REGISTRY.contains_key(&cache_id));

// But get_cache should clean it up and return None
let result = get_cache(&cache_id);
assert!(result.is_none());

// Now it should be removed from registry
assert!(!CACHE_REGISTRY.contains_key(&cache_id));
}
}
Loading
Loading