Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add metrics to redis collab stream #1116

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libs/collab-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tokio-util = { version = "0.7" }
prost.workspace = true
async-stream.workspace = true
async-trait.workspace = true
prometheus-client.workspace = true
zstd = "0.13"
loole = "0.4.0"

Expand Down
12 changes: 10 additions & 2 deletions libs/collab-stream/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
use crate::error::{internal, StreamError};
use crate::lease::{Lease, LeaseAcquisition};
use crate::metrics::CollabStreamMetrics;
use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId};
use crate::stream_group::{StreamConfig, StreamGroup};
use crate::stream_router::{StreamRouter, StreamRouterOptions};
Expand All @@ -21,14 +22,21 @@ pub struct CollabRedisStream {
impl CollabRedisStream {
pub const LEASE_TTL: Duration = Duration::from_secs(60);

pub async fn new(redis_client: redis::Client) -> Result<Self, redis::RedisError> {
pub async fn new(
redis_client: redis::Client,
metrics: Arc<CollabStreamMetrics>,
) -> Result<Self, redis::RedisError> {
let router_options = StreamRouterOptions {
worker_count: 60,
xread_streams: 100,
xread_block_millis: Some(5000),
xread_count: None,
};
let stream_router = Arc::new(StreamRouter::with_options(&redis_client, router_options)?);
let stream_router = Arc::new(StreamRouter::with_options(
&redis_client,
metrics,
router_options,
)?);
let connection_manager = redis_client.get_connection_manager().await?;
Ok(Self::new_with_connection_manager(
connection_manager,
Expand Down
1 change: 1 addition & 0 deletions libs/collab-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod client;
pub mod collab_update_sink;
pub mod error;
pub mod lease;
pub mod metrics;
pub mod model;
pub mod pubsub;
pub mod stream_group;
Expand Down
28 changes: 28 additions & 0 deletions libs/collab-stream/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use prometheus_client::metrics::counter::Counter;
use prometheus_client::registry::Registry;

#[derive(Default)]
pub struct CollabStreamMetrics {
/// Incremented each time a new collab stream read task is set (including recurring tasks).
pub reads_enqueued: Counter,
/// Incremented each time an existing task is consumed (including recurring tasks).
pub reads_dequeued: Counter,
}

impl CollabStreamMetrics {
pub fn register(registry: &mut Registry) -> Self {
let metrics = Self::default();
let realtime_registry = registry.sub_registry_with_prefix("collab_stream");
realtime_registry.register(
"reads_enqueued",
"Incremented each time a new collab stream read task is set (including recurring tasks).",
metrics.reads_enqueued.clone(),
);
realtime_registry.register(
"reads_dequeued",
"Incremented each time an existing task is consumed (including recurring tasks).",
metrics.reads_dequeued.clone(),
);
metrics
}
}
96 changes: 84 additions & 12 deletions libs/collab-stream/src/stream_router.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::metrics::CollabStreamMetrics;
use loole::{Receiver, Sender};
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::Client;
Expand Down Expand Up @@ -27,14 +28,19 @@ pub struct StreamRouter {
alive: Arc<AtomicBool>,
#[allow(dead_code)]
workers: Vec<Worker>,
metrics: Arc<CollabStreamMetrics>,
}

impl StreamRouter {
pub fn new(client: &Client) -> Result<Self, RedisError> {
Self::with_options(client, Default::default())
pub fn new(client: &Client, metrics: Arc<CollabStreamMetrics>) -> Result<Self, RedisError> {
Self::with_options(client, metrics, Default::default())
}

pub fn with_options(client: &Client, options: StreamRouterOptions) -> Result<Self, RedisError> {
pub fn with_options(
client: &Client,
metrics: Arc<CollabStreamMetrics>,
options: StreamRouterOptions,
) -> Result<Self, RedisError> {
let alive = Arc::new(AtomicBool::new(true));
let (tx, rx) = loole::unbounded();
let mut workers = Vec::with_capacity(options.worker_count);
Expand All @@ -47,6 +53,7 @@ impl StreamRouter {
rx.clone(),
alive.clone(),
&options,
metrics.clone(),
);
workers.push(worker);
}
Expand All @@ -55,6 +62,7 @@ impl StreamRouter {
buf: tx,
workers,
alive,
metrics,
})
}

Expand All @@ -63,6 +71,7 @@ impl StreamRouter {
let last_id = last_id.unwrap_or_else(|| "0".to_string());
let h = StreamHandle::new(stream_key, last_id, tx);
self.buf.send(h).unwrap();
self.metrics.reads_enqueued.inc();
rx
}
}
Expand Down Expand Up @@ -118,6 +127,7 @@ impl Worker {
rx: Receiver<StreamHandle>,
alive: Arc<AtomicBool>,
options: &StreamRouterOptions,
metrics: Arc<CollabStreamMetrics>,
) -> Self {
let mut xread_options = StreamReadOptions::default();
if let Some(block_millis) = options.xread_block_millis {
Expand All @@ -128,7 +138,7 @@ impl Worker {
}
let count = options.xread_streams;
let handle = std::thread::spawn(move || {
if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count) {
if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count, metrics) {
tracing::error!("worker {} failed: {}", worker_id, err);
}
});
Expand All @@ -142,11 +152,13 @@ impl Worker {
alive: Arc<AtomicBool>,
options: StreamReadOptions,
count: usize,
metrics: Arc<CollabStreamMetrics>,
) -> RedisResult<()> {
let mut stream_keys = Vec::with_capacity(count);
let mut message_ids = Vec::with_capacity(count);
let mut senders = HashMap::with_capacity(count);
while alive.load(SeqCst) {
// receive next `count` of stream read requests
if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) {
break; // rx channel has closed
}
Expand All @@ -158,10 +170,12 @@ impl Worker {
continue;
}

metrics.reads_dequeued.inc_by(key_count as u64);
let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?;

let mut msgs = 0;
for stream in result.keys {
// for each stream returned from Redis, resolve corresponding subscriber and send messages
let mut remove_sender = false;
if let Some((sender, idx)) = senders.get(stream.key.as_str()) {
for id in stream.ids {
Expand All @@ -170,7 +184,7 @@ impl Worker {
message_ids[*idx].clone_from(&message_id); //TODO: optimize
msgs += 1;
if let Err(err) = sender.send((message_id, value)) {
tracing::warn!("failed to send: {}", err);
tracing::debug!("failed to send: {}", err);
remove_sender = true;
}
}
Expand All @@ -188,7 +202,8 @@ impl Worker {
key_count
);
}
Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
let scheduled = Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
metrics.reads_enqueued.inc_by(scheduled as u64);
}
Ok(())
}
Expand All @@ -198,21 +213,27 @@ impl Worker {
keys: &mut Vec<StreamKey>,
ids: &mut Vec<String>,
senders: &mut HashMap<&str, (StreamSender, usize)>,
) {
) -> usize {
let keys = keys.drain(..);
let mut ids = ids.drain(..);
let mut scheduled = 0;
for key in keys {
if let Some(last_id) = ids.next() {
if let Some((sender, _)) = senders.remove(key.as_str()) {
if sender.is_closed() {
Copy link
Collaborator Author

@Horusiath Horusiath Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was that we only unscheduled the stream subscriber if it was closed AND stream returned at least one message that we wanted to forward into (now closed) subscriber. If no new messages from stream were received, it will be rescheduled again even if subscriber was closed.

This could mean that if we had 5000 subscribers in the past, but no new data was coming from their Redis streams, and only 5 of them where active atm. we still tried to XREAD 5000 streams instead of 5.

continue; // sender is already closed
}
let h = StreamHandle::new(key, last_id, sender);
if let Err(err) = tx.send(h) {
tracing::warn!("failed to reschedule: {}", err);
tracing::error!("failed to reschedule: {}", err);
break;
}
scheduled += 1;
}
}
}
senders.clear();
scheduled
}

fn read_buf(
Expand Down Expand Up @@ -276,19 +297,23 @@ impl StreamHandle {

#[cfg(test)]
mod test {
use crate::stream_router::StreamRouter;
use crate::metrics::CollabStreamMetrics;
use crate::stream_router::{StreamRouter, StreamRouterOptions};
use rand::random;
use redis::{Client, Commands, FromRedisValue};
use std::sync::Arc;
use tokio::task::JoinSet;

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn multi_worker_preexisting_messages() {
const ROUTES_COUNT: usize = 200;
const MSG_PER_ROUTE: usize = 10;

let mut client = Client::open("redis://127.0.0.1/").unwrap();
let keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
let metrics = Arc::new(CollabStreamMetrics::default());

let router = StreamRouter::new(&client).unwrap();
let router = StreamRouter::new(&client, metrics).unwrap();

let mut join_set = JoinSet::new();
for key in keys {
Expand All @@ -313,8 +338,9 @@ mod test {
const MSG_PER_ROUTE: usize = 10;
let mut client = Client::open("redis://127.0.0.1/").unwrap();
let keys = init_streams(&mut client, ROUTES_COUNT, 0);
let metrics = Arc::new(CollabStreamMetrics::default());

let router = StreamRouter::new(&client).unwrap();
let router = StreamRouter::new(&client, metrics).unwrap();

let mut join_set = JoinSet::new();
for key in keys.iter() {
Expand Down Expand Up @@ -348,15 +374,61 @@ mod test {
let _: String = client.xadd(&key, "*", &[("data", 1)]).unwrap();
let m2: String = client.xadd(&key, "*", &[("data", 2)]).unwrap();
let m3: String = client.xadd(&key, "*", &[("data", 3)]).unwrap();
let metrics = Arc::new(CollabStreamMetrics::default());

let router = StreamRouter::new(&client).unwrap();
let router = StreamRouter::new(&client, metrics).unwrap();
let mut observer = router.observe(key, Some(m2));

let (msg_id, m) = observer.recv().await.unwrap();
assert_eq!(msg_id, m3);
assert_eq!(u32::from_redis_value(&m["data"]).unwrap(), 3);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn drop_subscription() {
const ROUTES_COUNT: usize = 1;
const MSG_PER_ROUTE: usize = 10;

let mut client = Client::open("redis://127.0.0.1/").unwrap();
let mut keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
let metrics = Arc::new(CollabStreamMetrics::default());

let router = StreamRouter::with_options(
&client,
metrics.clone(),
StreamRouterOptions {
worker_count: 2,
xread_streams: 100,
xread_block_millis: Some(50),
xread_count: None,
},
)
.unwrap();

let key = keys.pop().unwrap();
let mut observer = router.observe(key.clone(), None);
for i in 0..MSG_PER_ROUTE {
let (_msg_id, map) = observer.recv().await.unwrap();
let value = String::from_redis_value(&map["data"]).unwrap();
assert_eq!(value, format!("{}-{}", key, i));
}
// drop observer and wait for worker to release
drop(observer);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let enqueued = metrics.reads_enqueued.get();
let dequeued = metrics.reads_dequeued.get();
assert_eq!(enqueued, dequeued, "dropped observer state");

// after dropping observer, no new polling task should be rescheduled
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
assert_eq!(metrics.reads_enqueued.get(), enqueued, "unchanged enqueues");
assert_eq!(metrics.reads_dequeued.get(), dequeued, "unchanged dequeues");

tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
assert_eq!(metrics.reads_enqueued.get(), enqueued, "unchanged enqueues");
assert_eq!(metrics.reads_dequeued.get(), dequeued, "unchanged dequeues");
}

fn init_streams(client: &mut Client, stream_count: usize, msgs_per_stream: usize) -> Vec<String> {
let test_prefix: u32 = random();
let mut keys = Vec::with_capacity(stream_count);
Expand Down
11 changes: 9 additions & 2 deletions services/appflowy-collaborate/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::actix_ws::server::RealtimeServerActor;
use crate::api::{collab_scope, ws_scope};
use crate::collab::access_control::CollabStorageAccessControlImpl;
use access_control::casbin::access::AccessControl;
use collab_stream::metrics::CollabStreamMetrics;
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
use database::file::s3_client_impl::AwsS3BucketClientImpl;

Expand Down Expand Up @@ -110,8 +111,12 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
let user_cache = UserCache::new(pg_pool.clone()).await;

info!("Connecting to Redis...");
let (redis_conn_manager, redis_stream_router) =
get_redis_client(config.redis_uri.expose_secret(), config.redis_worker_count).await?;
let (redis_conn_manager, redis_stream_router) = get_redis_client(
config.redis_uri.expose_secret(),
config.redis_worker_count,
metrics.collab_stream_metrics.clone(),
)
.await?;

// Pg listeners
info!("Setting up Pg listeners...");
Expand Down Expand Up @@ -189,12 +194,14 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
async fn get_redis_client(
redis_uri: &str,
worker_count: usize,
metrics: Arc<CollabStreamMetrics>,
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
info!("Connecting to redis with uri: {}", redis_uri);
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;

let router = StreamRouter::with_options(
&client,
metrics,
StreamRouterOptions {
worker_count,
xread_streams: 100,
Expand Down
4 changes: 4 additions & 0 deletions services/appflowy-collaborate/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::pg_listener::PgListeners;
use crate::CollabRealtimeMetrics;
use access_control::metrics::AccessControlMetrics;
use app_error::AppError;
use collab_stream::metrics::CollabStreamMetrics;
use collab_stream::stream_router::StreamRouter;
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
use indexer::metrics::EmbeddingMetrics;
Expand Down Expand Up @@ -40,6 +41,7 @@ pub struct AppMetrics {
pub access_control_metrics: Arc<AccessControlMetrics>,
pub realtime_metrics: Arc<CollabRealtimeMetrics>,
pub collab_metrics: Arc<CollabMetrics>,
pub collab_stream_metrics: Arc<CollabStreamMetrics>,
pub embedding_metrics: Arc<EmbeddingMetrics>,
}

Expand All @@ -55,12 +57,14 @@ impl AppMetrics {
let access_control_metrics = Arc::new(AccessControlMetrics::register(&mut registry));
let realtime_metrics = Arc::new(CollabRealtimeMetrics::register(&mut registry));
let collab_metrics = Arc::new(CollabMetrics::register(&mut registry));
let collab_stream_metrics = Arc::new(CollabStreamMetrics::register(&mut registry));
let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry));
Self {
registry: Arc::new(registry),
access_control_metrics,
realtime_metrics,
collab_metrics,
collab_stream_metrics,
embedding_metrics,
}
}
Expand Down
Loading
Loading