Skip to content

Conversation

peterneyens
Copy link

The main goal is to reduce the overhead we add when we have a lot of subscriptions.

  • Avoid locking for subscribe and unsubscribe. We should be able to subscribe to or unsubscribe from channels/patterns independent of concurrent subscriptions to other channels/patterns without being blocked.
    In this PR we represent the subscription lifecycle as multiple states. We can move between those state without locking, but still keep operations on the same channel/pattern wait on state changes by using a Deferred as a signal.
    We don't go back to a single Ref with a Map, but switch to using a MapRef backed by a ConcurrentHashMap allowing us to reduce contention if we have a lot of concurrent state changes.
  • Avoid using multiple RedisPubSubListeners.
    The listeners are only called one by one. Since we have a map with all subscriptions, we can look up the subscription and publish to its topic in one listener.

This is a more ambitious version of what I started in #972.
Unfortunately this made the diff a lot larger than I wanted it to be.

  • While avoiding the lock clearly makes things more complicated, it also forces us to think more about the different states and state changes (for example unsubscribe failing currently leaves us in a state where we can't retry, since we keep a Redis4CatsSubscription with a single subscription, without actual subscribers to the topic).
  • I moved everything that is only used by the Subscriber implementation into the Subscriber companion object, which I think makes things easier (but that is probably subjective).
    The Redis4CatsSubscription class is now the Active part of SubscriptionState.

I tried to keep the behavior as closely to the existing implementation as I could. There are a few places that could use some additional work, but I didn't want to make any additional changes to implementation.

  • I think we could unsubscribe in unsubscribe instead of waiting on the last subscription stream finalizer (since no new messages will be processed anyway).
  • There are some potential improvements with how we handle publishing messages.
    We should document that one single subscriber not keeping up with its channel will block not only all subscriptions for the same channel/pattern but for all channels/patterns, since we can't publish to the topic.
    We could potentially publish to multiple channels/patterns in parallel (but we should benchmark first)

The main goal is to reduce the overhead when we have a lot of
subscriptions.

- Avoid locking for `subscribe` and `unsubscribe`. We should be able
  to subscribe to or unsubscribe from other channels/patterns
  independent of concurrent subscriptions to other channels/patterns
  without being blocked.
  We represent the subscription lifecycle as multiple states. We can
  move between those state without locking, but still keep operations
  on the same channel/key wait on state changes by using a `Deferred`
  as a signal.
  We don't go back to a single `Ref` with a `Map`, but switch to using
  a `MapRef` backed by a `ConcurrentHashMap` allowing us to reduce
  contention if we have a lot of concurrent state changes.
- Avoid using multiple `RedisPubSubListener`s. The listeners are only
  called one by one.
  Since we have a map with all subscriptions, we can look up the
  subscription and publish to its topic in one listener.
Scala 3 is still unhappy with value classes and generic methods
@arturaz
Copy link
Collaborator

arturaz commented Mar 27, 2025

Sorry for delaying the request for so long. Was busy, then got sick.

I hope to review this this week.

sub: RedisChannel[String] => IO[Unit],
unsub: RedisChannel[String] => IO[Unit]
): IO[Subscriber.SubscriptionMap[IO, RedisChannel[String], String]] = {
// import effect.Log.Stdout._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Comment on lines 37 to 38
subscription <- map.subscribe(channel1).compile.toList.start
_ <- waitOnFiber
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of sleep which makes for long tests and race conditions in CI, how about cats effect's testcontrol? Otherwise, if we want to keep the exact runtime:

Suggested change
subscription <- map.subscribe(channel1).compile.toList.start
_ <- waitOnFiber
waitForSubscribe <- Deferred[IO, Unit]
subscription <- map.subscribe(channel1).compile.toList.flatTap(_ => waitForSubscribe.complete(())).start
_ <- waitForSubscribe.get()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the sleep initially because that is what RedisPubSubSpec uses, but unlike those tests we aren't actually communicating with redis here.

The complicating factor is that subscribe is doing quite a lot: it is subscribing, but also returning the messages. In these tests we want to be subscribed (both to redis and to the Topic) before we use any of the other operations where we assume there is a subscription.

I am working on a change to the internal subscribe signature to make it easier to test (separating the subscription from the message consumption effect), but I still need to clean up these tests a bit more before I'll push that work. Thanks for this comment, I should have given this some more thought before.

On a side note, your Deferred suggestion unfortunately doesn't work. The subscribe method never terminates on it is own. We need something to call unsubscribe or end the stream early.

// Lettuce calls the listeners one by one (for every subscribe,
// unsubscribe, message, ...), so using multiple listeners when we can
// find the right subscription easily, is inefficient.
dispatcher <- Dispatcher.sequential[F] // we have no parallelism in the listener
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not familiar with the pubsub code, but wonder if this code would be simpler if for each pubSub.subscribe(myChannel), we add a listener to Lettuce what creates a sequential dispatcher and puts messages on a queue which an FS2 stream consumes. This would delegate all the state handling to Lettuce which in turn uses Netty.

Looking at existing code, I think it's somewhat what I'd expect, where a listener and subscriber is created for each subscribe call. All the listener does is publish to a fs2.Topic (which I'm not familiar with, but guessing it's some queue). Also seems like it could use sequential and not parallel as all it does it add to Topic.

So if I understand correctly a single dispatcher could work, but not sure it's necessarily better than multiple.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code uses a listener for every subscription indeed. Lettuce calls every listener for every message though. So the more subscriptions we have, the more listeners will get called where all but one will ignore the message. That feels very inefficient. Since we have a map with all subscriptions, we can pass the message to the Topic from the right subscription directly with a single listener and thus avoid calling N listeners for every message (when we have N distinct subscriptions).

This would delegate all the state handling to Lettuce which in turn uses Netty.

Not sure what you mean here. Lettuce only has a list of listeners, it doesn't know what those listeners are interested in (message, subscribed, unsubscribed, ...).

So if I understand correctly a single dispatcher could work, but not sure it's necessarily better than multiple.

In the existing code a single sequential dispatcher would have worked too I think. Since the listeners get called one by one and we use unsafeRunSync, there is no way for us to ever have multiple dispatcher calls at the same time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, Lettuce does a linear scan, whereas we can do constant time lookup to route message to respective topic. This makes sense from an efficiency perspective, which isn't to say that the Lettuce design is bad.

My initial gut reaction would be to allow supporting both, such that end users could set up multiple listeners if they want multiple consumers for same key, but the default Subscriber goes for efficiency. Will refine these thoughts while reviewing.

Thanks for clarifying, will give PR another look.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already support for multiple subscribers for the same key. That is why the code is using a Topic, which uses a Channel for every subscription.

new RedisPubSubAdapter[K, V] {
override def message(ch: K, msg: V): Unit =
try
dispatcher.unsafeRunSync(state.channelSubs.onMessage(RedisChannel(ch), msg))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mention this above, but since we funnel all messages through a single sequential dispatcher, which is backed by a single queue for the IO effects, this could create "head-of-line" blocking. There could exist a topic with a single subscriber blocking, and that would lead to block subscribers on all other topics. And this is b/c onMessage just routes to a topic, then publishes to it.

I think solutions would be to add queues or do routing at the Java layer.

Copy link
Author

@peterneyens peterneyens Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could create "head-of-line" blocking

We already have head of line blocking, lettuce uses a single connection to redis and calls listeners one by one. A topic with a single blocking subscriber currently also already blocks all subscriptions. This PR doesn't solve that issue no. The main change in this PR is to avoid blocking while subscribing and unsubscribing, the rest of the behaviour should have stayed the same.

I don't think it is an issue that we can solve, I think the best we can do is to document it clearly that blocked subscribers can block all redis subscriptions and that clients should handle this on their end.

@mmienko
Copy link
Collaborator

mmienko commented Apr 17, 2025

The main goal is to reduce the overhead we add when we have a lot of subscriptions. (1) Avoid locking for subscribe and unsubscribe....(2) Avoid using multiple RedisPubSubListeners....

I agree with point (2) and I would expect better performance with a single listener that dispatches to single dispatcher. However, we need to be careful of subscribers of one topic blocking subscribers of other topics.

Regarding (1), yeah it's significantly more complicated, which makes it difficult to reason about correctness. Curious how back the blocking could be with AtomicCell and if there is a cat's Read-Write Lock we could use. I like the use of MapRef, but how come we the Subscribing and Unsubscribing states. Lastly, I presume onMessage will get called more often than subscribe and unsubscribe, so we want onMessage to be more efficient, but we need to access the Map of Topics on each method. Does that sound correct?

@yisraelU yisraelU changed the base branch from series/1.x to series/2.x May 27, 2025 15:44
@peterneyens
Copy link
Author

but how come we the Subscribing and Unsubscribing states.

If we don't lock while subscribing and unsubscribing, we need to track that we are subscribing or unsubscribing, so that concurrent calls to subscribe/unsubscribe to they same key can wait until the in progress subscribe/unsubscribe finishes (to use the fresh subscription, retry, ...).

This does make it more complicated indeed, that is unfortunately the consequence of removing the single shared lock for all subscriptions. We could wait untl typelevel/cats-effect#4424 lands to make a SubscriptionMap implementation that is more like the existing implementation but with a lock per key.

Lastly, I presume onMessage will get called more often than subscribe and unsubscribe, so we want onMessage to be more efficient, but we need to access the Map of Topics on each method. Does that sound correct?

Yes that is correct, but this should be faster than how we currently call all listerers one by one for all messages.

@peterneyens
Copy link
Author

Sorry for the very long delay, I didn't find time to come back to this.

I pushed the changes I mentioned to handle a subscription lifecycle more explicitly (using Resource) (#984 (comment)). That way we no longer need the fiber await sleeps in SubscriberSuite, because inside the Resource we will have an active subscription.

It would be nice to expose a suscribeAwait(key: K): Resource[F, Stream[F, V]] alongside subscribe in SubscribeCommands to give clients insight in when the subscription is available if they want to, but that is unfortunately a breaking change (and I missed 2.x).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants