Skip to content

Commit

Permalink
chore: set exp time for collab base on collab type (AppFlowy-IO#820)
Browse files Browse the repository at this point in the history
* chore: set exp time for collab base on collab type

* chore: fix test
  • Loading branch information
appflowy authored Sep 13, 2024
1 parent 9422110 commit a4b885e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 8 deletions.
13 changes: 10 additions & 3 deletions services/appflowy-collaborate/src/collab/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use database::collab::CollabMetadata;
use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult};

use crate::collab::disk_cache::CollabDiskCache;
use crate::collab::mem_cache::CollabMemCache;
use crate::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache};
use crate::state::RedisConnectionManager;

#[derive(Clone)]
Expand Down Expand Up @@ -82,6 +82,7 @@ impl CollabCache {

// Retrieve from disk cache as fallback. After retrieval, the value is inserted into the memory cache.
let object_id = query.object_id.clone();
let expiration_secs = cache_exp_secs_from_collab_type(&query.collab_type);
let encode_collab = self.disk_cache.get_collab_encoded_from_disk(query).await?;

// spawn a task to insert the encoded collab into the memory cache
Expand All @@ -90,7 +91,7 @@ impl CollabCache {
let timestamp = chrono::Utc::now().timestamp();
tokio::spawn(async move {
mem_cache
.insert_encode_collab(&object_id, cloned_encode_collab, timestamp)
.insert_encode_collab(&object_id, cloned_encode_collab, timestamp, expiration_secs)
.await;
});
Ok(encode_collab)
Expand Down Expand Up @@ -159,6 +160,7 @@ impl CollabCache {
&object_id,
&encode_collab_data,
chrono::Utc::now().timestamp(),
Some(cache_exp_secs_from_collab_type(&params.collab_type)),
)
.await
{
Expand Down Expand Up @@ -201,7 +203,12 @@ impl CollabCache {
let timestamp = chrono::Utc::now().timestamp();
self
.mem_cache
.insert_encode_collab_data(&params.object_id, &params.encoded_collab_v1, timestamp)
.insert_encode_collab_data(
&params.object_id,
&params.encoded_collab_v1,
timestamp,
Some(cache_exp_secs_from_collab_type(&params.collab_type)),
)
.await
.map_err(|err| AppError::Internal(err.into()))?;
Ok(())
Expand Down
27 changes: 23 additions & 4 deletions services/appflowy-collaborate/src/collab/mem_cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::anyhow;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use redis::{pipe, AsyncCommands};
use tracing::{error, instrument, trace};

Expand All @@ -8,7 +9,7 @@ use crate::state::RedisConnectionManager;
use app_error::AppError;
use database::collab::CollabMetadata;

const SEVEN_DAYS: i64 = 604800;
const SEVEN_DAYS: u64 = 604800;
const ONE_MONTH: u64 = 2592000;
#[derive(Clone)]
pub struct CollabMemCache {
Expand Down Expand Up @@ -113,13 +114,14 @@ impl CollabMemCache {
object_id: &str,
encoded_collab: EncodedCollab,
timestamp: i64,
expiration_seconds: u64,
) {
trace!("Inserting encode collab into cache: {}", object_id);
let result = tokio::task::spawn_blocking(move || encoded_collab.encode_to_bytes()).await;
match result {
Ok(Ok(bytes)) => {
if let Err(err) = self
.insert_data_with_timestamp(object_id, &bytes, timestamp)
.insert_data_with_timestamp(object_id, &bytes, timestamp, Some(expiration_seconds))
.await
{
error!("Failed to cache encoded collab: {:?}", err);
Expand All @@ -134,14 +136,17 @@ impl CollabMemCache {
}
}

/// Inserts data into Redis with a conditional timestamp.
/// if the expiration_seconds is None, the data will be expired after 7 days.
pub async fn insert_encode_collab_data(
&self,
object_id: &str,
data: &[u8],
timestamp: i64,
expiration_seconds: Option<u64>,
) -> redis::RedisResult<()> {
self
.insert_data_with_timestamp(object_id, data, timestamp)
.insert_data_with_timestamp(object_id, data, timestamp, expiration_seconds)
.await
}

Expand All @@ -163,6 +168,7 @@ impl CollabMemCache {
object_id: &str,
data: &[u8],
timestamp: i64,
expiration_seconds: Option<u64>,
) -> redis::RedisResult<()> {
let cache_object_id = encode_collab_key(object_id);
let mut conn = self.connection_manager.clone();
Expand Down Expand Up @@ -211,7 +217,7 @@ impl CollabMemCache {
.atomic()
.set(&cache_object_id, data)
.ignore()
.expire(&cache_object_id, SEVEN_DAYS) // Setting the expiration to 7 days
.expire(&cache_object_id, expiration_seconds.unwrap_or(SEVEN_DAYS) as i64) // Setting the expiration to 7 days
.ignore();
pipeline.query_async(&mut conn).await?;
}
Expand Down Expand Up @@ -286,3 +292,16 @@ fn encode_collab_key(object_id: &str) -> String {
fn collab_meta_key(object_id: &str) -> String {
format!("collab_meta_v0:{}", object_id)
}

#[inline]
pub fn cache_exp_secs_from_collab_type(collab_type: &CollabType) -> u64 {
match collab_type {
CollabType::Document => SEVEN_DAYS * 2,
CollabType::Database => SEVEN_DAYS * 2,
CollabType::WorkspaceDatabase => ONE_MONTH,
CollabType::Folder => SEVEN_DAYS,
CollabType::DatabaseRow => ONE_MONTH,
CollabType::UserAwareness => SEVEN_DAYS * 2,
CollabType::Unknown => SEVEN_DAYS,
}
}
5 changes: 4 additions & 1 deletion tests/ai_test/chat_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,11 @@ async fn create_chat_context_test() {
.get_answer(&workspace_id, &chat_id, question.message_id)
.await
.unwrap();
assert!(answer.content.contains("US"));
println!("answer: {:?}", answer);
if answer.content.contains("United States") {
return;
}
assert!(answer.content.contains("US"));
}

// #[tokio::test]
Expand Down
4 changes: 4 additions & 0 deletions tests/collab/storage_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ async fn collab_mem_cache_read_write_test() {
&object_id,
&encode_collab.encode_to_bytes().unwrap(),
timestamp,
None,
)
.await
.unwrap();
Expand All @@ -275,6 +276,7 @@ async fn collab_mem_cache_insert_override_test() {
&object_id,
&encode_collab.encode_to_bytes().unwrap(),
timestamp,
None,
)
.await
.unwrap();
Expand All @@ -289,6 +291,7 @@ async fn collab_mem_cache_insert_override_test() {
.encode_to_bytes()
.unwrap(),
timestamp,
None,
)
.await
.unwrap();
Expand All @@ -308,6 +311,7 @@ async fn collab_mem_cache_insert_override_test() {
.encode_to_bytes()
.unwrap(),
timestamp,
None,
)
.await
.unwrap();
Expand Down

0 comments on commit a4b885e

Please sign in to comment.