Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PubSub api #714

Merged
merged 58 commits into from
Jul 27, 2023
Merged

Implement PubSub api #714

merged 58 commits into from
Jul 27, 2023

Conversation

0pg
Copy link
Contributor

@0pg 0pg commented Jan 4, 2023

resolves #160

Added RedisSubscription and SubscriptionExecutor

  • Distinct between Publish and Subscribe commands
  • Execute Subscribe commands on SubscriptionExecutor
  • Subscribe commands with multiple parameters at once return Stream[RedisError, (String, A)] that tuple is (key, value) pair

@0pg 0pg requested a review from a team as a code owner January 4, 2023 16:24
@CLAassistant
Copy link

CLAassistant commented Jan 4, 2023

CLA assistant check
All committers have signed the CLA.

@0pg
Copy link
Contributor Author

0pg commented Jan 9, 2023

It might be better to get the subscription state for connection recovery 🤔

@0pg
Copy link
Contributor Author

0pg commented Jan 16, 2023

@mijicd I think I've done for the first step. please give me a feedback about this PR when you are ready 😃

) extends RedisExecutor {
scope: Scope.Closeable,
codec: BinaryCodec
) extends RedisExecutor
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think that cluster executor should inherit RedisPubSub.
I think it's more logic if it would be other way, there are would be some RedisPubSubClusterExecutor that is going to inherit ClusterExecuter, like SingleNodeRedisPubSubExecutor
And there is going to be some consistency.

@mijicd
Copy link
Member

mijicd commented Jan 30, 2023

@0pg please rebase and address the change requested by @anatolysergeev 🙏

redis/src/main/scala/zio/redis/SingleNodeExecutor.scala Outdated Show resolved Hide resolved
redis/src/main/scala/zio/redis/SingleNodeExecutor.scala Outdated Show resolved Hide resolved
redis/src/main/scala/zio/redis/ClusterExecutor.scala Outdated Show resolved Hide resolved
}
}

final def subscribe(channel: String, channels: String*): ZStream[Redis, RedisError, PushProtocol] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of multiple channels passed, we should return a List/Chunk of streams. Also, I think that we should hide PushProtocol from the end user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of multiple channels passed, we should return a List/Chunk of streams.

okay then it'd be better to get api with a single param and the callers have to handle multiple params using ZIO.foreach

I think that we should hide PushProtocol from the end user.

hmm... meaning that defines separate data types containing pushed message content for only end users or registers callbacks for each pubsub command for the users who want to handle subscribe/unsubscribe messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or using ResultBuilder that ask to give callbacks of sub/unsub when calls returning

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def subscribe(channel: String): ResultOutputStreamBuilder =
  new ResultOutputStreamBuilder(
    override def returning[R: Schema]: ZStream[Redis, RedisError, R] =
        ZStream.serviceWithStream[Redis] { redis =>
          RedisPubSubCommand
            .run(RedisPubSubCommand.Subscribe(channel))
            .collect { case t: PushProtocol.Message => t.message }
            .mapZIO { resp =>
              ZIO
                .attempt(ArbitraryOutput[R]().unsafeDecode(resp)(redis.codec))
                .refineToOrDie[RedisError]
            }
        }
  )

def subscribe(channels: Chunk[String]): ResultOutputMultiStreamBuilder =
  new ResultOutputMultiStreamBuilder(
    override def returning[R: Schema]: UIO[Chunk[ZStream[Redis, RedisError, R]]] =
        ZIO.serviceWithZIO[Redis] { redis =>
          RedisPubSubCommand
            .run(RedisPubSubCommand.Subscribe(channels.head, channels.tail))
            .collect { case t: PushProtocol.Message => t.message }
            .broadcast(channels.length, maximumLag)
            .map(streams =>
              streams.zipWith(channels) { (stream, channel) =>
                stream
                  .filter(_.key.value == channel)
                  .mapZIO { resp =>
                    ZIO
                      .attempt(ArbitraryOutput[R]().unsafeDecode(resp)(redis.codec))
                      .refineToOrDie[RedisError]
                  }
              }
            )
        }

This is just a sketch. I hope that I managed to explain what's the base idea through this code. Of course, something like ResultOutputMultiStreamBuilder should be added for multi-stream scenarios.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's almost same what I though and mentioned above 👍

Fix RedisExecutor structure

Add PubSub api

Fix broken compile

Add PushProtocolOutput suite

Fix message field type as generic

Fix broken output

Fix PushProtocol output spec

Add key property in PushProtocol

Add test implementation

Add PubSub integration test

Fix formatting

Refactor RedisPubSub

Apply RedisPubSub refactoring

Apply RedisPubSub refactoring to t/c

Remove unused file

Fix logic bugs

Fix broken t/c

Fix unsubscribe process

Fix pubSubSpec

Add request message broker in SingleNodeRedisPubSub

Simplify RedisPubSub's public api

Revert unrelated changes
@0pg
Copy link
Contributor Author

0pg commented Feb 8, 2023

@anovakovic01 Hi there are some changes please check 🙏

  • added ResultStreamBuilder[+F[_]
  • added subscription callback

Cluster PubSubExecutor will be worked by another PR

redis/src/main/scala/zio/redis/Output.scala Outdated Show resolved Hide resolved
redis/src/main/scala/zio/redis/Output.scala Outdated Show resolved Hide resolved
case class Pattern(value: String) extends SubscriptionKey
}

case class NumSubResponse(channel: String, subscriberCount: Long)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be better to name this class NumberOfSubscribers, or NumOfSubs. Also, you should place it in the options/PubSub.scala file and make it final.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed and moved it inside to companion object of options/PubSub


object PushProtocol {
case class Subscribe(channel: String, numOfSubscription: Long) extends PushProtocol {
def key: SubscriptionKey = SubscriptionKey.Channel(channel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need SubscriptionKey? Can you use String instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean that gets rid of SubscriptionKey? or changes return type of def key to String?
I changed return type to String

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not being clear. I meant to remove SubscriptionKey entirely and replace it with a plain String.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I moved SubscriptionKey into SingleNodeRedisPubSub as a private case class


final def subscribeWithCallback(
channel: String,
channels: List[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use String* here.

final def subscribe(channel: String): ResultStreamBuilder[Id] =
subscribeWithCallback(channel)(emptyCallback)

final def subscribe(channel: String, channels: List[String]): ResultStreamBuilder[List] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use String* here.

Comment on lines 62 to 65
ZIO
.attempt(ArbitraryOutput[R]().unsafeDecode(msg)(codec))
.refineToOrDie[RedisError]
.asSome
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this can be part of the run implementation in the RedisPubSubCommand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made RedisPubSubCommand takes responsibilities of transforming message types and invoking callback

Comment on lines 14 to 29
sealed trait PubSubCommand

object PubSubCommand {
case class Subscribe(
channel: String,
channels: List[String],
onSubscribe: PubSubCallback
) extends PubSubCommand
case class PSubscribe(
pattern: String,
patterns: List[String],
onSubscribe: PubSubCallback
) extends PubSubCommand
case class Unsubscribe(channels: List[String]) extends PubSubCommand
case class PUnsubscribe(patterns: List[String]) extends PubSubCommand
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should hide these from the end user. These should be in a dedicated PubSubCommand file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it to its own file but do we need to add package private access modifier?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we do if it's not part of the api interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added package private accessor

@0pg
Copy link
Contributor Author

0pg commented May 7, 2023

@anovakovic01 I fixed some cross compile issues 🙏
92457c4

@mijicd
Copy link
Member

mijicd commented May 7, 2023

@anovakovic01 this PR needs to address a few things before merging (compilation, linting, sync with master)

@0pg
Copy link
Contributor Author

0pg commented May 8, 2023

Is there anything to do about test failures? it seems like non deterministic 🤔

@mijicd
Copy link
Member

mijicd commented May 8, 2023

It looks like these aren't directly related to your change, so I'd say let's not touch any of them in this PR.

Copy link
Member

@mijicd mijicd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking care of this. It's a massive chunk of work, and it's going in the right direction.

During the first review pass, I focused on the package layout, top-level abstractions, and protocol correctness. Once all of them are in place, I'll do a more detailed review of its internals, specifically the streaming part.

One general remark is that we can improve the test suite to use property tests and cover more than just a "happy path". Note that this remark applies to most tests, but we must start improving their state somewhere.

Comment on lines 28 to 33
case class Subscribe(channel: String, numOfSubs: Long) extends PushProtocol
case class PSubscribe(pattern: String, numOfSubs: Long) extends PushProtocol
case class Unsubscribe(channel: String, numOfSubs: Long) extends PushProtocol
case class PUnsubscribe(pattern: String, numOfSubs: Long) extends PushProtocol
case class Message(channel: String, message: RespValue) extends PushProtocol
case class PMessage(pattern: String, channel: String, message: RespValue) extends PushProtocol
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the case classes final. Besides that, types look odd, and there's a lot of duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I grouped message types using new SubscriptionKey type that contains context of channel or pattern
a154683

Copy link
Contributor Author

@0pg 0pg May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it also changes SingleNodeSubscriptionExecutor's member Ref that manages subscription state

object PubSub {
type PubSubCallback = (String, Long) => UIO[Unit]

private[redis] sealed trait PushProtocol
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename this to PushMessage. Also, this doesn't really look like an option, it's very much internal thing.

@@ -639,6 +640,50 @@ object Output {
}
}

case object PushProtocolOutput extends Output[PushProtocol] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename accordingly (see the remark about PushProtocol below). Another question is how visible it should be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushMessageOutput is only used in internal package so I added private[redis] to this object
a154683

commandName <-
ZIO
.fromOption(command.args.collectFirst { case RespCommandArgument.CommandName(name) => name })
.orElseFail(RedisError.CommandNameNotFound(command.args.toString()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's adjust the error constructor to avoid invoking toString explicitly.

new ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] {
def returning[R: Schema]: Stream[RedisError, (String, R)] =
RedisSubscriptionCommand(executor).subscribe(
Chunk.single(channel) ++ Chunk.fromIterable(channels),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for chunk concatenation.

Comment on lines +88 to +89
pattern: String,
patterns: String*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type-safety could be improved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting that defines the parameter types for channels and patterns?

}

object Subscription {
private lazy val emptyCallback = (_: String, _: Long) => ZIO.unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's follow the constants definition guideline. Also no need to make it lazy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this value and replaced to using Option

import zio.schema.Schema
import zio.schema.codec.BinaryCodec

private[redis] trait SubscribeEnvironment {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to generalize the environments, though I'm fine with exploring that possibility in a follow-up.

@@ -1,9 +1,10 @@
package zio.redis

import zio._
import zio.redis.Output._
import zio.redis.Output.{PushProtocolOutput, _}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unnecessary change.

Copy link
Member

@mijicd mijicd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking care of this. It's a massive chunk of work, and it's going in the right direction.

During the first review pass, I focused on the package layout, top-level abstractions, and protocol correctness. Once all of them are in place, I'll do a more detailed review of its internals, specifically the streaming part.

One general remark is that we can improve the test suite to use property tests and cover more than just a "happy path". Note that this remark applies to most tests, but we must start improving their state somewhere.

@0pg
Copy link
Contributor Author

0pg commented Jun 2, 2023

@mijicd Thank you for your review. I worked some parts based on comments. please to check changes and reply comments when you're fine. 😃

Copy link
Member

@mijicd mijicd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anovakovic01 @anatolysergeev Please take a look!


package zio.redis.internal

object PubSub {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just mark the whole object as private[redis]. Everything in internal is marked that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I marked it to them


private[redis] object RequestQueue {
private final val RequestQueueSize = 16
def create[A]: UIO[Queue[A]] = Queue.bounded[A](RequestQueueSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would drop this and move constant to package object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved constant to package object and removed this object

Comment on lines 34 to 39
val run: IO[RedisError, AnyVal] =
ZIO.logTrace(s"$this sender and reader has been started") *>
(send.repeat(Schedule.forever) race receive)
.tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> onError(e))
.retryWhile(True)
.tapError(e => ZIO.logError(s"Executor exiting: $e"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should likely be private and final.

Copy link
Contributor Author

@0pg 0pg Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I marked it as protected final(it broke compile on scala3) private[internal] final

import zio.{Chunk, ChunkBuilder, Hub, IO, Promise, Queue, Ref, Scope, UIO, URIO, ZIO}

private[redis] final class SingleNodeSubscriptionExecutor private (
subsRef: Ref[Map[SubscriptionKey, Hub[Take[RedisError, PushMessage]]]],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth using ConcurrentMap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced Ref[Map[...]] to ConcurrentMap

def getHub(key: SubscriptionKey): IO[RedisError, Hub[Take[RedisError, PushMessage]]] =
subsRef.get
.map(_.get(key))
.flatMap(ZIO.fromOption(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can go without (_). Do that on all applicable places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to use .apply because 3.2.2 version requires it

}
}

case object NumSubResponseOutput extends Output[Chunk[NumberOfSubscribers]] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that there is an order in the response, but maybe you should use a Map instead (for easier access to the value of the specific channel).

.fromOption(command.args.collectFirst { case RespCommandArgument.CommandName(name) =>
name
})
.orElseFail(RedisError.CommandNameNotFound(command))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that ProtocolError should be thrown instead, but I'm not sure how you can do it from here. @mijicd maybe we can update RespCommand to receive a NonEmptyChunk because it always has to have a name (or add a name parameter). We could do this in a dedicated PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thrown ProtocolError more makes sense 🤔 I'll try to this using NonEmptyChunk approach in another PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made PR that gives name separated field
#852

Comment on lines 33 to 34
onSubscribe: Option[PubSubCallback],
onUnsubscribe: Option[PubSubCallback]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about giving default values here and removing Option by introducing NoopCallback:

Suggested change
onSubscribe: Option[PubSubCallback],
onUnsubscribe: Option[PubSubCallback]
onSubscribe: PubSubCallback = NoopCallback,
onUnsubscribe: PubSubCallback = NoopCallback

The same suggestion applies to other methods with optional callback parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it seems to make more ergonomic api. I changed to use NoopCallback.
I changed function name subscribe to subscribeSingle for only single channel subscription call because it's more clear to distinct multiple channels subscription call (also they have different return type signature)
5ca98e7 (#714)

}.map(SubscriptionKey.Pattern.apply)

def send: IO[RedisError.IOError, Unit] =
requests.takeAll.flatMap { reqs =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use takeBetween(1, RequestQueueSize) just like in the existing send implementation.


trait Subscription extends SubscribeEnvironment {

final def subscribe(channel: String): ResultStreamBuilder1[Id] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc comments for the public API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added doc Subscription and Publishing apis

@anovakovic01
Copy link
Member

@0pg Great work! You're almost there, these are some minor changes that I've commented on, but nothing major.

@mijicd mijicd merged commit 407b9ac into zio:master Jul 27, 2023
19 of 20 checks passed
@0pg 0pg mentioned this pull request Jul 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement pub/sub API
6 participants