Skip to content

Commit

Permalink
Fix broken compile
Browse files Browse the repository at this point in the history
  • Loading branch information
0pg committed May 7, 2023
1 parent 1a0862f commit 92457c4
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.redis

import zio.{&, URLayer, ZIO, ZLayer}
import zio._

trait RedisSubscription extends api.Subscription

Expand All @@ -12,7 +12,7 @@ object RedisSubscription {
SubscriptionExecutor.layer >>> makeLayer

private def makeLayer: URLayer[CodecSupplier & SubscriptionExecutor, RedisSubscription] =
ZLayer.fromFunction(Live.apply _)
ZLayer.fromFunction(Live.apply _)

private final case class Live(codecSupplier: CodecSupplier, executor: SubscriptionExecutor) extends RedisSubscription
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import zio.redis.Input.{CommandNameInput, StringInput}
import zio.redis.Output.PushProtocolOutput
import zio.redis.SingleNodeSubscriptionExecutor.{Request, RequestQueueSize, True}
import zio.redis.api.Subscription
import zio.redis.internal.{RedisConnection, RespCommand, RespCommandArgument, RespValue, logScopeFinalizer}
import zio.redis.internal._
import zio.redis.options.PubSub.PushProtocol
import zio.stream._
import zio.{Chunk, ChunkBuilder, IO, Promise, Queue, Ref, Schedule, ZIO}
import zio.{Chunk, ChunkBuilder, IO, Promise, Queue, Ref, Schedule, Scope, URIO, ZIO}

final class SingleNodeSubscriptionExecutor(
private[redis] final class SingleNodeSubscriptionExecutor private (
channelSubsRef: Ref[Map[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]],
patternSubsRef: Ref[Map[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]],
reqQueue: Queue[Request],
Expand Down Expand Up @@ -41,16 +41,17 @@ final class SingleNodeSubscriptionExecutor(
ZStream
.fromZIO(
for {
queues <- ZIO.foreach(command.args.collect { case key: RespCommandArgument.Key => key.value.asString })(key =>
Queue
.unbounded[Take[RedisError, PushProtocol]]
.tap(queue =>
subscriptionRef.update(
_.updatedWith(key)(_.map(_ appended queue).orElse(Some(Chunk.single(queue))))
)
)
.map(key -> _)
)
queues <-
ZIO.foreach(command.args.collect { case key: RespCommandArgument.Key => key.value.asString })(key =>
Queue
.unbounded[Take[RedisError, PushProtocol]]
.tap(queue =>
subscriptionRef.update(subscription =>
subscription.updated(key, subscription.getOrElse(key, Chunk.empty) ++ Chunk.single(queue))
)
)
.map(key -> _)
)
promise <- Promise.make[RedisError, Unit]
_ <- reqQueue.offer(
Request(
Expand All @@ -61,7 +62,14 @@ final class SingleNodeSubscriptionExecutor(
streams = queues.map { case (key, queue) =>
ZStream
.fromQueueWithShutdown(queue)
.ensuring(subscriptionRef.update(_.updatedWith(key)(_.map(_.filterNot(_ == queue)))))
.ensuring(
subscriptionRef.update(subscription =>
subscription.get(key) match {
case Some(queues) => subscription.updated(key, queues.filterNot(_ == queue))
case None => subscription
}
)
)
}
_ <- promise.await.tapError(_ => ZIO.foreachDiscard(queues) { case (_, queue) => queue.shutdown })
} yield streams.fold(ZStream.empty)(_ merge _)
Expand Down Expand Up @@ -173,7 +181,7 @@ final class SingleNodeSubscriptionExecutor(
.tapError(e => ZIO.logError(s"Executor exiting: $e"))
}

object SingleNodeSubscriptionExecutor {
private[redis] object SingleNodeSubscriptionExecutor {
private final case class Request(
command: Chunk[RespValue.BulkString],
promise: Promise[RedisError, Unit]
Expand All @@ -183,7 +191,7 @@ object SingleNodeSubscriptionExecutor {

private final val RequestQueueSize = 16

def create(conn: RedisConnection) =
def create(conn: RedisConnection): URIO[Scope, SubscriptionExecutor] =
for {
reqQueue <- Queue.bounded[Request](RequestQueueSize)
channelRef <- Ref.make(Map.empty[String, Chunk[Queue[Take[RedisError, PushProtocol]]]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import zio.schema.codec.BinaryCodec
import zio.stream._
import zio.{Chunk, IO, ZIO}

private[redis] final case class RedisSubscriptionCommand(executor: SubscriptionExecutor) extends {
private[redis] final case class RedisSubscriptionCommand(executor: SubscriptionExecutor) {
import zio.redis.options.PubSub.PushProtocol._

def subscribe[A: BinaryCodec](
Expand Down

0 comments on commit 92457c4

Please sign in to comment.