Skip to content

Commit

Permalink
Implement PubSub api (#714)
Browse files Browse the repository at this point in the history
  • Loading branch information
0pg authored Jul 27, 2023
1 parent f75676d commit 407b9ac
Show file tree
Hide file tree
Showing 19 changed files with 1,168 additions and 23 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ lazy val redis =
.settings(
libraryDependencies ++= List(
"dev.zio" %% "zio-streams" % zioVersion,
"dev.zio" %% "zio-concurrent" % zioVersion,
"dev.zio" %% "zio-schema" % zioSchemaVersion,
"dev.zio" %% "zio-schema-protobuf" % zioSchemaVersion % Test,
"dev.zio" %% "zio-test" % zioVersion % Test,
Expand Down
49 changes: 49 additions & 0 deletions modules/redis/src/main/scala/zio/redis/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package zio.redis

import zio._
import zio.redis.internal.PubSub.{PushMessage, SubscriptionKey}
import zio.redis.internal.RespValue
import zio.redis.options.Cluster.{Node, Partition, SlotRange}
import zio.schema.Schema
Expand Down Expand Up @@ -638,6 +639,54 @@ object Output {
}
}

private[redis] case object PushMessageOutput extends Output[PushMessage] {
protected def tryDecode(respValue: RespValue): PushMessage =
respValue match {
case RespValue.NullArray => throw ProtocolError(s"Array must not be empty")
case RespValue.Array(values) =>
val name = MultiStringOutput.unsafeDecode(values(0))
val key = MultiStringOutput.unsafeDecode(values(1))
name match {
case "subscribe" =>
val num = LongOutput.unsafeDecode(values(2))
PushMessage.Subscribed(SubscriptionKey.Channel(key), num)
case "psubscribe" =>
val num = LongOutput.unsafeDecode(values(2))
PushMessage.Subscribed(SubscriptionKey.Pattern(key), num)
case "unsubscribe" =>
val num = LongOutput.unsafeDecode(values(2))
PushMessage.Unsubscribed(SubscriptionKey.Channel(key), num)
case "punsubscribe" =>
val num = LongOutput.unsafeDecode(values(2))
PushMessage.Unsubscribed(SubscriptionKey.Pattern(key), num)
case "message" =>
val message = values(2)
PushMessage.Message(SubscriptionKey.Channel(key), key, message)
case "pmessage" =>
val channel = MultiStringOutput.unsafeDecode(values(2))
val message = values(3)
PushMessage.Message(SubscriptionKey.Pattern(key), channel, message)
case other => throw ProtocolError(s"$other isn't a pushed message")
}
case other => throw ProtocolError(s"$other isn't an array")
}
}

case object NumSubResponseOutput extends Output[Map[String, Long]] {
protected def tryDecode(respValue: RespValue): Map[String, Long] =
respValue match {
case RespValue.Array(values) =>
val builder = Map.newBuilder[String, Long]
values.grouped(2).foreach { chunk =>
val channel = MultiStringOutput.unsafeDecode(chunk(0))
val numOfSubs = LongOutput.unsafeDecode(chunk(1))
builder += channel -> numOfSubs
}
builder.result()
case other => throw ProtocolError(s"$other isn't an array")
}
}

private def decodeDouble(bytes: Chunk[Byte]): Double = {
val text = new String(bytes.toArray, StandardCharsets.UTF_8)
try text.toDouble
Expand Down
1 change: 1 addition & 0 deletions modules/redis/src/main/scala/zio/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ trait Redis
with api.Streams
with api.Scripting
with api.Cluster
with api.Publishing

object Redis {
lazy val cluster: ZLayer[CodecSupplier & RedisClusterConfig, RedisError, Redis] =
Expand Down
12 changes: 11 additions & 1 deletion modules/redis/src/main/scala/zio/redis/RedisError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package zio.redis

import zio.redis.internal.PubSub.SubscriptionKey
import zio.redis.internal.RespCommand
import zio.redis.options.Cluster.Slot

import java.io.IOException
Expand Down Expand Up @@ -46,5 +48,13 @@ object RedisError {
object Moved {
def apply(slotAndAddress: (Slot, RedisUri)): Moved = Moved(slotAndAddress._1, slotAndAddress._2)
}
final case class IOError(exception: IOException) extends RedisError
final case class IOError(exception: IOException) extends RedisError
final case class CommandNameNotFound(message: String) extends RedisError
object CommandNameNotFound {
def apply(command: RespCommand): CommandNameNotFound = CommandNameNotFound(command.args.toString())
}

sealed trait PubSubError extends RedisError
final case class InvalidPubSubCommand(command: String) extends PubSubError
final case class SubscriptionStreamAlreadyClosed(key: SubscriptionKey) extends PubSubError
}
35 changes: 35 additions & 0 deletions modules/redis/src/main/scala/zio/redis/RedisSubscription.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021 John A. De Goes and the ZIO contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.redis

import zio._
import zio.redis.internal.SubscriptionExecutor

trait RedisSubscription extends api.Subscription

object RedisSubscription {
lazy val local: ZLayer[CodecSupplier, RedisError.IOError, RedisSubscription] =
SubscriptionExecutor.local >>> makeLayer

lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, RedisSubscription] =
SubscriptionExecutor.layer >>> makeLayer

private def makeLayer: URLayer[CodecSupplier & SubscriptionExecutor, RedisSubscription] =
ZLayer.fromFunction(Live.apply _)

private final case class Live(codecSupplier: CodecSupplier, executor: SubscriptionExecutor) extends RedisSubscription
}
5 changes: 5 additions & 0 deletions modules/redis/src/main/scala/zio/redis/ResultBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package zio.redis
import zio.IO
import zio.redis.ResultBuilder.NeedsReturnType
import zio.schema.Schema
import zio.stream.Stream

sealed trait ResultBuilder {
final def map(f: Nothing => Any)(implicit nrt: NeedsReturnType): IO[Nothing, Nothing] = ???
Expand All @@ -45,4 +46,8 @@ object ResultBuilder {
trait ResultOutputBuilder extends ResultBuilder {
def returning[R: Output]: IO[RedisError, R]
}

trait ResultStreamBuilder1[+F[_]] extends ResultBuilder {
def returning[R: Schema]: Stream[RedisError, F[R]]
}
}
89 changes: 89 additions & 0 deletions modules/redis/src/main/scala/zio/redis/api/Publishing.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2021 John A. De Goes and the ZIO contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.redis.api

import zio.redis.Input._
import zio.redis.Output._
import zio.redis._
import zio.redis.internal.{RedisCommand, RedisEnvironment}
import zio.schema.Schema
import zio.{Chunk, IO}

trait Publishing extends RedisEnvironment {
import Publishing._

/**
* Posts a message to the given channel.
*
* @param channel
* Target channel name for publishing messages.
* @param message
* The value of the message to be published to the channel.
* @return
* Returns the number of clients that received the message.
*/
final def publish[A: Schema](channel: String, message: A): IO[RedisError, Long] = {
val command = RedisCommand(Publish, Tuple2(StringInput, ArbitraryKeyInput[A]()), LongOutput, executor)
command.run((channel, message))
}

/**
* Lists the currently active channel that has one or more subscribers (excluding clients subscribed to patterns).
*
* @param pattern
* Pattern to get matching channels.
* @return
* Returns a list of active channels matching the specified pattern.
*/
final def pubSubChannels(pattern: String): IO[RedisError, Chunk[String]] = {
val command = RedisCommand(PubSubChannels, StringInput, ChunkOutput(MultiStringOutput), executor)
command.run(pattern)
}

/**
* The number of unique patterns that are subscribed to by clients.
*
* @return
* Returns the number of patterns all the clients are subscribed to.
*/
final def pubSubNumPat: IO[RedisError, Long] = {
val command = RedisCommand(PubSubNumPat, NoInput, LongOutput, executor)
command.run(())
}

/**
* The number of subscribers (exclusive of clients subscribed to patterns) for the specified channels.
*
* @param channel
* Channel name to get the number of subscribers.
* @param channels
* Channel names to get the number of subscribers.
* @return
* Returns a map of channel and number of subscribers for channel.
*/
final def pubSubNumSub(channel: String, channels: String*): IO[RedisError, Map[String, Long]] = {
val command = RedisCommand(PubSubNumSub, NonEmptyList(StringInput), NumSubResponseOutput, executor)
command.run((channel, channels.toList))
}
}

private[redis] object Publishing {
final val Publish = "PUBLISH"
final val PubSubChannels = "PUBSUB CHANNELS"
final val PubSubNumPat = "PUBSUB NUMPAT"
final val PubSubNumSub = "PUBSUB NUMSUB"
}
Loading

0 comments on commit 407b9ac

Please sign in to comment.