Skip to content

Commit

Permalink
Add redis pubsub layer
Browse files Browse the repository at this point in the history
Fix RedisExecutor structure

Add PubSub api

Fix broken compile

Add PushProtocolOutput suite

Fix message field type as generic

Fix broken output

Fix PushProtocol output spec

Add key property in PushProtocol

Add test implementation

Add PubSub integration test

Fix formatting

Refactor RedisPubSub

Apply RedisPubSub refactoring

Apply RedisPubSub refactoring to t/c

Remove unused file

Fix logic bugs

Fix broken t/c

Fix unsubscribe process

Fix pubSubSpec

Add request message broker in SingleNodeRedisPubSub

Simplify RedisPubSub's public api

Revert unrelated changes
  • Loading branch information
0pg committed Jan 31, 2023
1 parent 01e43af commit a280504
Show file tree
Hide file tree
Showing 19 changed files with 903 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ object BenchmarkRuntime {
private final val Layer =
ZLayer.make[Redis](
RedisExecutor.local,
RedisPubSub.local,
ZLayer.succeed[BinaryCodec](ProtobufCodec),
RedisLive.layer
)
Expand Down
3 changes: 2 additions & 1 deletion example/src/main/scala/example/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import example.config.AppConfig
import sttp.client3.httpclient.zio.HttpClientZioBackend
import zhttp.service.Server
import zio._
import zio.redis.{RedisExecutor, RedisLive}
import zio.redis.{RedisExecutor, RedisLive, RedisPubSub}
import zio.schema.codec.{BinaryCodec, ProtobufCodec}

object Main extends ZIOAppDefault {
Expand All @@ -33,6 +33,7 @@ object Main extends ZIOAppDefault {
ContributorsCache.layer,
HttpClientZioBackend.layer(),
RedisExecutor.layer,
RedisPubSub.layer,
RedisLive.layer,
ZLayer.succeed[BinaryCodec](ProtobufCodec)
)
Expand Down
46 changes: 46 additions & 0 deletions redis/src/main/scala/zio/redis/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -827,4 +827,50 @@ object Output {
case other => throw ProtocolError(s"$other isn't an array")
}
}

case object PushProtocolOutput extends Output[PushProtocol] {
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): PushProtocol =
respValue match {
case RespValue.NullArray => throw ProtocolError(s"Array must not be empty")
case RespValue.Array(values) =>
val name = MultiStringOutput.unsafeDecode(values(0))
val key = MultiStringOutput.unsafeDecode(values(1))
name match {
case "subscribe" =>
val num = LongOutput.unsafeDecode(values(2))
PushProtocol.Subscribe(key, num)
case "psubscribe" =>
val num = LongOutput.unsafeDecode(values(2))
PushProtocol.PSubscribe(key, num)
case "unsubscribe" =>
val num = LongOutput.unsafeDecode(values(2))
PushProtocol.Unsubscribe(key, num)
case "punsubscribe" =>
val num = LongOutput.unsafeDecode(values(2))
PushProtocol.PUnsubscribe(key, num)
case "message" =>
val message = values(2)
PushProtocol.Message(key, message)
case "pmessage" =>
val channel = MultiStringOutput.unsafeDecode(values(2))
val message = values(3)
PushProtocol.PMessage(key, channel, message)
case other => throw ProtocolError(s"$other isn't a pushed message")
}
case other => throw ProtocolError(s"$other isn't an array")
}
}

case object NumSubResponseOutput extends Output[Chunk[NumSubResponse]] {
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[NumSubResponse] =
respValue match {
case RespValue.Array(values) =>
Chunk.fromIterator(values.grouped(2).map { chunk =>
val channel = MultiStringOutput.unsafeDecode(chunk(0))
val numOfSubscription = LongOutput.unsafeDecode(chunk(1))
NumSubResponse(channel, numOfSubscription)
})
case other => throw ProtocolError(s"$other isn't an array")
}
}
}
13 changes: 10 additions & 3 deletions redis/src/main/scala/zio/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,18 @@ trait Redis
with api.Cluster {
def codec: BinaryCodec
def executor: RedisExecutor
def pubSub: RedisPubSub
}

final case class RedisLive(codec: BinaryCodec, executor: RedisExecutor) extends Redis
final case class RedisLive(codec: BinaryCodec, executor: RedisExecutor, pubSub: RedisPubSub) extends Redis

object RedisLive {
lazy val layer: URLayer[RedisExecutor with BinaryCodec, Redis] =
ZLayer.fromFunction(RedisLive.apply _)
lazy val layer: URLayer[RedisPubSub with RedisExecutor with BinaryCodec, Redis] =
ZLayer.fromZIO(
for {
codec <- ZIO.service[BinaryCodec]
executor <- ZIO.service[RedisExecutor]
pubSub <- ZIO.service[RedisPubSub]
} yield RedisLive(codec, executor, pubSub)
)
}
4 changes: 2 additions & 2 deletions redis/src/main/scala/zio/redis/RedisExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ trait RedisExecutor {

object RedisExecutor {
lazy val layer: ZLayer[RedisConfig, RedisError.IOError, RedisExecutor] =
RedisConnectionLive.layer >>> SingleNodeExecutor.layer
RedisConnectionLive.layer.fresh >>> SingleNodeExecutor.layer

lazy val local: ZLayer[Any, RedisError.IOError, RedisExecutor] =
RedisConnectionLive.default >>> SingleNodeExecutor.layer
RedisConnectionLive.default.fresh >>> SingleNodeExecutor.layer

lazy val test: ULayer[RedisExecutor] =
TestExecutor.layer
Expand Down
25 changes: 25 additions & 0 deletions redis/src/main/scala/zio/redis/RedisPubSub.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package zio.redis

import zio.schema.codec.BinaryCodec
import zio.stream._
import zio.{ULayer, ZIO, ZLayer}

trait RedisPubSub {
def execute(command: RedisPubSubCommand): ZStream[BinaryCodec, RedisError, PushProtocol]
}

object RedisPubSub {
lazy val layer: ZLayer[RedisConfig with BinaryCodec, RedisError.IOError, RedisPubSub] =
RedisConnectionLive.layer.fresh >>> pubSublayer

lazy val local: ZLayer[BinaryCodec, RedisError.IOError, RedisPubSub] =
RedisConnectionLive.default.fresh >>> pubSublayer

lazy val test: ULayer[RedisPubSub] =
TestExecutor.layer

private lazy val pubSublayer: ZLayer[RedisConnection with BinaryCodec, RedisError.IOError, RedisPubSub] =
ZLayer.scoped(
ZIO.service[RedisConnection].flatMap(SingleNodeRedisPubSub.create(_))
)
}
19 changes: 19 additions & 0 deletions redis/src/main/scala/zio/redis/RedisPubSubCommand.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package zio.redis

import zio.ZLayer
import zio.stream.ZStream

sealed abstract class RedisPubSubCommand

object RedisPubSubCommand {
case class Subscribe(channel: String, channels: List[String]) extends RedisPubSubCommand
case class PSubscribe(pattern: String, patterns: List[String]) extends RedisPubSubCommand
case class Unsubscribe(channels: List[String]) extends RedisPubSubCommand
case class PUnsubscribe(patterns: List[String]) extends RedisPubSubCommand

def run(command: RedisPubSubCommand): ZStream[Redis, RedisError, PushProtocol] =
ZStream.serviceWithStream { redis =>
val codecLayer = ZLayer.succeed(redis.codec)
redis.pubSub.execute(command).provideLayer(codecLayer)
}
}
5 changes: 5 additions & 0 deletions redis/src/main/scala/zio/redis/ResultBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package zio.redis
import zio.IO
import zio.redis.ResultBuilder.NeedsReturnType
import zio.schema.Schema
import zio.stream.ZStream

sealed trait ResultBuilder {
final def map(f: Nothing => Any)(implicit nrt: NeedsReturnType): IO[Nothing, Nothing] = ???
Expand Down Expand Up @@ -46,4 +47,8 @@ object ResultBuilder {
trait ResultOutputBuilder extends ResultBuilder {
def returning[R: Output]: IO[RedisError, R]
}

trait ResultOutputStreamBuilder {
def returning[R: Schema]: ZStream[Redis, RedisError, R]
}
}
169 changes: 169 additions & 0 deletions redis/src/main/scala/zio/redis/SingleNodeRedisPubSub.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package zio.redis

import zio.redis.Input.{NonEmptyList, StringInput, Varargs}
import zio.redis.Output.PushProtocolOutput
import zio.redis.SingleNodeRedisPubSub.{Request, RequestQueueSize, True}
import zio.redis.api.PubSub
import zio.schema.codec.BinaryCodec
import zio.stream._
import zio.{Chunk, ChunkBuilder, Hub, Promise, Queue, Ref, Schedule, UIO, ZIO}

import scala.reflect.ClassTag

final class SingleNodeRedisPubSub(
pubSubHubsRef: Ref[Map[SubscriptionKey, Hub[PushProtocol]]],
reqQueue: Queue[Request],
connection: RedisConnection
) extends RedisPubSub {

def execute(command: RedisPubSubCommand): ZStream[BinaryCodec, RedisError, PushProtocol] =
command match {
case RedisPubSubCommand.Subscribe(channel, channels) => subscribe(channel, channels)
case RedisPubSubCommand.PSubscribe(pattern, patterns) => pSubscribe(pattern, patterns)
case RedisPubSubCommand.Unsubscribe(channels) => unsubscribe(channels)
case RedisPubSubCommand.PUnsubscribe(patterns) => pUnsubscribe(patterns)
}

private def subscribe(
channel: String,
channels: List[String]
): ZStream[BinaryCodec, RedisError, PushProtocol] =
makeSubscriptionStream(PubSub.Subscribe, SubscriptionKey.Channel(channel), channels.map(SubscriptionKey.Channel(_)))

private def pSubscribe(
pattern: String,
patterns: List[String]
): ZStream[BinaryCodec, RedisError, PushProtocol] =
makeSubscriptionStream(
PubSub.PSubscribe,
SubscriptionKey.Pattern(pattern),
patterns.map(SubscriptionKey.Pattern(_))
)

private def unsubscribe(channels: List[String]): ZStream[BinaryCodec, RedisError, PushProtocol] =
makeUnsubscriptionStream(PubSub.Unsubscribe, channels.map(SubscriptionKey.Channel(_)))

private def pUnsubscribe(patterns: List[String]): ZStream[BinaryCodec, RedisError, PushProtocol] =
makeUnsubscriptionStream(PubSub.PUnsubscribe, patterns.map(SubscriptionKey.Pattern(_)))

private def makeSubscriptionStream(command: String, key: SubscriptionKey, keys: List[SubscriptionKey]) =
ZStream.unwrap[BinaryCodec, RedisError, PushProtocol](
ZIO.serviceWithZIO[BinaryCodec] { implicit codec =>
for {
promise <- Promise.make[RedisError, Unit]
chunk = StringInput.encode(command) ++ NonEmptyList(StringInput).encode((key.value, keys.map(_.value)))
stream <- makeStream(key :: keys)
_ <- reqQueue.offer(Request(chunk, promise))
_ <- promise.await
} yield stream
}
)

private def makeUnsubscriptionStream[T <: SubscriptionKey: ClassTag](command: String, keys: List[T]) =
ZStream.unwrap[BinaryCodec, RedisError, PushProtocol](
ZIO.serviceWithZIO[BinaryCodec] { implicit codec =>
for {
targets <- if (keys.isEmpty) pubSubHubsRef.get.map(_.keys.collect { case t: T => t }.toList)
else ZIO.succeedNow(keys)
chunk = StringInput.encode(command) ++ Varargs(StringInput).encode(keys.map(_.value))
promise <- Promise.make[RedisError, Unit]
stream <- makeStream(targets)
_ <- reqQueue.offer(Request(chunk, promise))
_ <- promise.await
} yield stream
}
)

private def makeStream(keys: List[SubscriptionKey]): UIO[Stream[RedisError, PushProtocol]] =
for {
streams <- ZIO.foreach(keys)(getHub(_).map(ZStream.fromHub(_)))
stream = streams.fold(ZStream.empty)(_ merge _)
} yield stream

private def getHub(key: SubscriptionKey) = {
def makeNewHub =
Hub
.unbounded[PushProtocol]
.tap(hub => pubSubHubsRef.update(_ + (key -> hub)))

for {
hubs <- pubSubHubsRef.get
hub <- ZIO.fromOption(hubs.get(key)).orElse(makeNewHub)
} yield hub
}

private def send =
reqQueue.takeBetween(1, RequestQueueSize).flatMap { reqs =>
val buffer = ChunkBuilder.make[Byte]()
val it = reqs.iterator

while (it.hasNext) {
val req = it.next()
buffer ++= RespValue.Array(req.command).serialize
}

val bytes = buffer.result()

connection
.write(bytes)
.mapError(RedisError.IOError(_))
.tapBoth(
e => ZIO.foreachDiscard(reqs)(_.promise.fail(e)),
_ => ZIO.foreachDiscard(reqs)(_.promise.succeed(()))
)
}

private def receive: ZIO[BinaryCodec, RedisError, Unit] =
ZIO.serviceWithZIO[BinaryCodec] { implicit codec =>
connection.read
.mapError(RedisError.IOError(_))
.via(RespValue.decoder)
.collectSome
.mapZIO(resp => ZIO.attempt(PushProtocolOutput.unsafeDecode(resp)))
.refineToOrDie[RedisError]
.foreach(push => getHub(push.key).flatMap(_.offer(push)))
}

private def resubscribe: ZIO[BinaryCodec, RedisError, Unit] =
ZIO.serviceWithZIO[BinaryCodec] { implicit codec =>
def makeCommand(name: String, keys: Set[String]) =
RespValue.Array(StringInput.encode(name) ++ Varargs(StringInput).encode(keys)).serialize

for {
keySet <- pubSubHubsRef.get.map(_.keySet)
(channels, patterns) = keySet.partition(_.isChannelKey)
_ <- (connection.write(makeCommand(PubSub.Subscribe, channels.map(_.value))).when(channels.nonEmpty) *>
connection.write(makeCommand(PubSub.PSubscribe, patterns.map(_.value))).when(patterns.nonEmpty))
.mapError(RedisError.IOError(_))
.retryWhile(True)
} yield ()
}

/**
* Opens a connection to the server and launches receive operations. All failures are retried by opening a new
* connection. Only exits by interruption or defect.
*/
val run: ZIO[BinaryCodec, RedisError, AnyVal] =
ZIO.logTrace(s"$this Executable sender and reader has been started") *>
(send.repeat[BinaryCodec, Long](Schedule.forever) race receive)
.tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> resubscribe)
.retryWhile(True)
.tapError(e => ZIO.logError(s"Executor exiting: $e"))
}

object SingleNodeRedisPubSub {
final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, Unit])

private final val True: Any => Boolean = _ => true

private final val RequestQueueSize = 16

def create(conn: RedisConnection) =
for {
hubRef <- Ref.make(Map.empty[SubscriptionKey, Hub[PushProtocol]])
reqQueue <- Queue.bounded[Request](RequestQueueSize)
pubSub = new SingleNodeRedisPubSub(hubRef, reqQueue, conn)
_ <- pubSub.run.forkScoped
_ <- logScopeFinalizer(s"$pubSub Node PubSub is closed")
} yield pubSub
}
Loading

0 comments on commit a280504

Please sign in to comment.