-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement PubSub api #714
Implement PubSub api #714
Conversation
It might be better to get the subscription state for connection recovery 🤔 |
@mijicd I think I've done for the first step. please give me a feedback about this PR when you are ready 😃 |
) extends RedisExecutor { | ||
scope: Scope.Closeable, | ||
codec: BinaryCodec | ||
) extends RedisExecutor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think that cluster executor should inherit RedisPubSub.
I think it's more logic if it would be other way, there are would be some RedisPubSubClusterExecutor that is going to inherit ClusterExecuter, like SingleNodeRedisPubSubExecutor
And there is going to be some consistency.
@0pg please rebase and address the change requested by @anatolysergeev 🙏 |
} | ||
} | ||
|
||
final def subscribe(channel: String, channels: String*): ZStream[Redis, RedisError, PushProtocol] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of multiple channels passed, we should return a List/Chunk of streams. Also, I think that we should hide PushProtocol
from the end user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of multiple channels passed, we should return a List/Chunk of streams.
okay then it'd be better to get api with a single param and the callers have to handle multiple params using ZIO.foreach
I think that we should hide PushProtocol from the end user.
hmm... meaning that defines separate data types containing pushed message content for only end users or registers callbacks for each pubsub command for the users who want to handle subscribe/unsubscribe messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or using ResultBuilder that ask to give callbacks of sub/unsub when calls returning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def subscribe(channel: String): ResultOutputStreamBuilder =
new ResultOutputStreamBuilder(
override def returning[R: Schema]: ZStream[Redis, RedisError, R] =
ZStream.serviceWithStream[Redis] { redis =>
RedisPubSubCommand
.run(RedisPubSubCommand.Subscribe(channel))
.collect { case t: PushProtocol.Message => t.message }
.mapZIO { resp =>
ZIO
.attempt(ArbitraryOutput[R]().unsafeDecode(resp)(redis.codec))
.refineToOrDie[RedisError]
}
}
)
def subscribe(channels: Chunk[String]): ResultOutputMultiStreamBuilder =
new ResultOutputMultiStreamBuilder(
override def returning[R: Schema]: UIO[Chunk[ZStream[Redis, RedisError, R]]] =
ZIO.serviceWithZIO[Redis] { redis =>
RedisPubSubCommand
.run(RedisPubSubCommand.Subscribe(channels.head, channels.tail))
.collect { case t: PushProtocol.Message => t.message }
.broadcast(channels.length, maximumLag)
.map(streams =>
streams.zipWith(channels) { (stream, channel) =>
stream
.filter(_.key.value == channel)
.mapZIO { resp =>
ZIO
.attempt(ArbitraryOutput[R]().unsafeDecode(resp)(redis.codec))
.refineToOrDie[RedisError]
}
}
)
}
This is just a sketch. I hope that I managed to explain what's the base idea through this code. Of course, something like ResultOutputMultiStreamBuilder
should be added for multi-stream scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that's almost same what I though and mentioned above 👍
Fix RedisExecutor structure Add PubSub api Fix broken compile Add PushProtocolOutput suite Fix message field type as generic Fix broken output Fix PushProtocol output spec Add key property in PushProtocol Add test implementation Add PubSub integration test Fix formatting Refactor RedisPubSub Apply RedisPubSub refactoring Apply RedisPubSub refactoring to t/c Remove unused file Fix logic bugs Fix broken t/c Fix unsubscribe process Fix pubSubSpec Add request message broker in SingleNodeRedisPubSub Simplify RedisPubSub's public api Revert unrelated changes
@anovakovic01 Hi there are some changes please check 🙏
Cluster PubSubExecutor will be worked by another PR |
case class Pattern(value: String) extends SubscriptionKey | ||
} | ||
|
||
case class NumSubResponse(channel: String, subscriberCount: Long) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be better to name this class NumberOfSubscribers
, or NumOfSubs
. Also, you should place it in the options/PubSub.scala
file and make it final
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed and moved it inside to companion object of options/PubSub
|
||
object PushProtocol { | ||
case class Subscribe(channel: String, numOfSubscription: Long) extends PushProtocol { | ||
def key: SubscriptionKey = SubscriptionKey.Channel(channel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need SubscriptionKey
? Can you use String
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean that gets rid of SubscriptionKey
? or changes return type of def key
to String
?
I changed return type to String
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for not being clear. I meant to remove SubscriptionKey
entirely and replace it with a plain String
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I moved SubscriptionKey
into SingleNodeRedisPubSub
as a private case class
|
||
final def subscribeWithCallback( | ||
channel: String, | ||
channels: List[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use String*
here.
final def subscribe(channel: String): ResultStreamBuilder[Id] = | ||
subscribeWithCallback(channel)(emptyCallback) | ||
|
||
final def subscribe(channel: String, channels: List[String]): ResultStreamBuilder[List] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use String*
here.
ZIO | ||
.attempt(ArbitraryOutput[R]().unsafeDecode(msg)(codec)) | ||
.refineToOrDie[RedisError] | ||
.asSome |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this can be part of the run
implementation in the RedisPubSubCommand
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made RedisPubSubCommand
takes responsibilities of transforming message types and invoking callback
sealed trait PubSubCommand | ||
|
||
object PubSubCommand { | ||
case class Subscribe( | ||
channel: String, | ||
channels: List[String], | ||
onSubscribe: PubSubCallback | ||
) extends PubSubCommand | ||
case class PSubscribe( | ||
pattern: String, | ||
patterns: List[String], | ||
onSubscribe: PubSubCallback | ||
) extends PubSubCommand | ||
case class Unsubscribe(channels: List[String]) extends PubSubCommand | ||
case class PUnsubscribe(patterns: List[String]) extends PubSubCommand | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should hide these from the end user. These should be in a dedicated PubSubCommand
file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it to its own file but do we need to add package private access modifier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we do if it's not part of the api
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added package private accessor
Co-authored-by: Aleksandar Novaković <anovakovic01@gmail.com>
Co-authored-by: Aleksandar Novaković <anovakovic01@gmail.com>
@anovakovic01 I fixed some cross compile issues 🙏 |
@anovakovic01 this PR needs to address a few things before merging (compilation, linting, sync with master) |
Is there anything to do about test failures? it seems like non deterministic 🤔 |
It looks like these aren't directly related to your change, so I'd say let's not touch any of them in this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking care of this. It's a massive chunk of work, and it's going in the right direction.
During the first review pass, I focused on the package layout, top-level abstractions, and protocol correctness. Once all of them are in place, I'll do a more detailed review of its internals, specifically the streaming part.
One general remark is that we can improve the test suite to use property tests and cover more than just a "happy path". Note that this remark applies to most tests, but we must start improving their state somewhere.
case class Subscribe(channel: String, numOfSubs: Long) extends PushProtocol | ||
case class PSubscribe(pattern: String, numOfSubs: Long) extends PushProtocol | ||
case class Unsubscribe(channel: String, numOfSubs: Long) extends PushProtocol | ||
case class PUnsubscribe(pattern: String, numOfSubs: Long) extends PushProtocol | ||
case class Message(channel: String, message: RespValue) extends PushProtocol | ||
case class PMessage(pattern: String, channel: String, message: RespValue) extends PushProtocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make the case classes final. Besides that, types look odd, and there's a lot of duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I grouped message types using new SubscriptionKey
type that contains context of channel or pattern
a154683
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it also changes SingleNodeSubscriptionExecutor
's member Ref
that manages subscription state
object PubSub { | ||
type PubSubCallback = (String, Long) => UIO[Unit] | ||
|
||
private[redis] sealed trait PushProtocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this to PushMessage
. Also, this doesn't really look like an option, it's very much internal thing.
@@ -639,6 +640,50 @@ object Output { | |||
} | |||
} | |||
|
|||
case object PushProtocolOutput extends Output[PushProtocol] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename accordingly (see the remark about PushProtocol
below). Another question is how visible it should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PushMessageOutput
is only used in internal
package so I added private[redis]
to this object
a154683
commandName <- | ||
ZIO | ||
.fromOption(command.args.collectFirst { case RespCommandArgument.CommandName(name) => name }) | ||
.orElseFail(RedisError.CommandNameNotFound(command.args.toString())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's adjust the error constructor to avoid invoking toString
explicitly.
new ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] { | ||
def returning[R: Schema]: Stream[RedisError, (String, R)] = | ||
RedisSubscriptionCommand(executor).subscribe( | ||
Chunk.single(channel) ++ Chunk.fromIterable(channels), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for chunk concatenation.
pattern: String, | ||
patterns: String* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type-safety could be improved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting that defines the parameter types for channels and patterns?
} | ||
|
||
object Subscription { | ||
private lazy val emptyCallback = (_: String, _: Long) => ZIO.unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's follow the constants definition guideline. Also no need to make it lazy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this value and replaced to using Option
import zio.schema.Schema | ||
import zio.schema.codec.BinaryCodec | ||
|
||
private[redis] trait SubscribeEnvironment { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to generalize the environments, though I'm fine with exploring that possibility in a follow-up.
@@ -1,9 +1,10 @@ | |||
package zio.redis | |||
|
|||
import zio._ | |||
import zio.redis.Output._ | |||
import zio.redis.Output.{PushProtocolOutput, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unnecessary change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for taking care of this. It's a massive chunk of work, and it's going in the right direction.
During the first review pass, I focused on the package layout, top-level abstractions, and protocol correctness. Once all of them are in place, I'll do a more detailed review of its internals, specifically the streaming part.
One general remark is that we can improve the test suite to use property tests and cover more than just a "happy path". Note that this remark applies to most tests, but we must start improving their state somewhere.
@mijicd Thank you for your review. I worked some parts based on comments. please to check changes and reply comments when you're fine. 😃 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anovakovic01 @anatolysergeev Please take a look!
|
||
package zio.redis.internal | ||
|
||
object PubSub { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just mark the whole object as private[redis]
. Everything in internal
is marked that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I marked it to them
|
||
private[redis] object RequestQueue { | ||
private final val RequestQueueSize = 16 | ||
def create[A]: UIO[Queue[A]] = Queue.bounded[A](RequestQueueSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would drop this and move constant to package object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved constant to package object and removed this object
val run: IO[RedisError, AnyVal] = | ||
ZIO.logTrace(s"$this sender and reader has been started") *> | ||
(send.repeat(Schedule.forever) race receive) | ||
.tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> onError(e)) | ||
.retryWhile(True) | ||
.tapError(e => ZIO.logError(s"Executor exiting: $e")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should likely be private and final.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I marked it as (it broke compile on scala3) protected final
private[internal] final
modules/redis/src/main/scala/zio/redis/internal/SingleNodeSubscriptionExecutor.scala
Outdated
Show resolved
Hide resolved
import zio.{Chunk, ChunkBuilder, Hub, IO, Promise, Queue, Ref, Scope, UIO, URIO, ZIO} | ||
|
||
private[redis] final class SingleNodeSubscriptionExecutor private ( | ||
subsRef: Ref[Map[SubscriptionKey, Hub[Take[RedisError, PushMessage]]]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth using ConcurrentMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced Ref[Map[...]]
to ConcurrentMap
def getHub(key: SubscriptionKey): IO[RedisError, Hub[Take[RedisError, PushMessage]]] = | ||
subsRef.get | ||
.map(_.get(key)) | ||
.flatMap(ZIO.fromOption(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can go without (_)
. Do that on all applicable places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed to use .apply
because 3.2.2
version requires it
…criptionExecutor.scala Co-authored-by: Dejan Mijić <dmijic@acm.org>
} | ||
} | ||
|
||
case object NumSubResponseOutput extends Output[Chunk[NumberOfSubscribers]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that there is an order in the response, but maybe you should use a Map
instead (for easier access to the value of the specific channel).
.fromOption(command.args.collectFirst { case RespCommandArgument.CommandName(name) => | ||
name | ||
}) | ||
.orElseFail(RedisError.CommandNameNotFound(command)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that ProtocolError
should be thrown instead, but I'm not sure how you can do it from here. @mijicd maybe we can update RespCommand
to receive a NonEmptyChunk
because it always has to have a name (or add a name parameter). We could do this in a dedicated PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thrown ProtocolError
more makes sense 🤔 I'll try to this using NonEmptyChunk
approach in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made PR that gives name separated field
#852
onSubscribe: Option[PubSubCallback], | ||
onUnsubscribe: Option[PubSubCallback] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about giving default values here and removing Option
by introducing NoopCallback
:
onSubscribe: Option[PubSubCallback], | |
onUnsubscribe: Option[PubSubCallback] | |
onSubscribe: PubSubCallback = NoopCallback, | |
onUnsubscribe: PubSubCallback = NoopCallback |
The same suggestion applies to other methods with optional callback parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it seems to make more ergonomic api. I changed to use NoopCallback
.
I changed function name subscribe
to subscribeSingle
for only single channel subscription call because it's more clear to distinct multiple channels subscription call (also they have different return type signature)
5ca98e7
(#714)
}.map(SubscriptionKey.Pattern.apply) | ||
|
||
def send: IO[RedisError.IOError, Unit] = | ||
requests.takeAll.flatMap { reqs => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use takeBetween(1, RequestQueueSize)
just like in the existing send
implementation.
|
||
trait Subscription extends SubscribeEnvironment { | ||
|
||
final def subscribe(channel: String): ResultStreamBuilder1[Id] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add doc comments for the public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added doc Subscription
and Publishing
apis
@0pg Great work! You're almost there, these are some minor changes that I've commented on, but nothing major. |
resolves #160
Added
RedisSubscription
andSubscriptionExecutor
SubscriptionExecutor
Stream[RedisError, (String, A)]
that tuple is (key, value) pair