Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] exposing RedisEnvironment via environment #739

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions redis/src/main/scala/zio/redis/ClusterExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ final case class ClusterExecutor(
def executeAsk(address: RedisUri) =
for {
executor <- executor(address)
_ <- executor.execute(AskingCommand(StringUtf8Codec, this).resp(()))
_ <- executor.execute(AskingCommand.resp((), StringUtf8Codec))
res <- executor.execute(command)
} yield res

Expand Down Expand Up @@ -130,14 +130,14 @@ object ClusterExecutor {

private def connectToCluster(address: RedisUri) =
for {
temporaryRedis <- redis(address)
(trLayer, trScope) = temporaryRedis
partitions <- ZIO.serviceWithZIO[Redis](_.slots).provideLayer(trLayer)
_ <- ZIO.logTrace(s"Cluster configs:\n${partitions.mkString("\n")}")
uniqueAddresses = partitions.map(_.master.address).distinct
uriExecScope <- ZIO.foreachPar(uniqueAddresses)(address => connectToNode(address).map(es => address -> es))
slots = slotAddress(partitions)
_ <- trScope.close(Exit.unit)
temporaryRedis <- redis(address)
(trLayer, reLayer, trScope) = temporaryRedis
partitions <- ZIO.serviceWithZIO[Redis](_.slots).provide(trLayer, reLayer)
_ <- ZIO.logTrace(s"Cluster configs:\n${partitions.mkString("\n")}")
uniqueAddresses = partitions.map(_.master.address).distinct
uriExecScope <- ZIO.foreachPar(uniqueAddresses)(address => connectToNode(address).map(es => address -> es))
slots = slotAddress(partitions)
_ <- trScope.close(Exit.unit)
} yield ClusterConnection(partitions, uriExecScope.toMap, slots)

private def connectToNode(address: RedisUri) =
Expand All @@ -152,12 +152,14 @@ object ClusterExecutor {
private def redis(address: RedisUri) = {
val executorLayer = ZLayer.succeed(RedisConfig(address.host, address.port)) >>> RedisExecutor.layer
val codecLayer = ZLayer.succeed[BinaryCodec](StringUtf8Codec)
val redisLayer = executorLayer ++ codecLayer >>> RedisLive.layer
val redisEnvLayer = executorLayer ++ codecLayer >>> RedisEnvironment.layer
val redisLayer = RedisLive.layer
for {
closableScope <- Scope.make
layer <- closableScope.extend[Any](redisLayer.memoize)
rLayer <- closableScope.extend[Any](redisLayer.memoize)
reLayer <- closableScope.extend[Any](redisEnvLayer.memoize)
_ <- logScopeFinalizer("Temporary redis connection is closed")
} yield (layer, closableScope)
} yield (rLayer, reLayer, closableScope)
}

private def slotAddress(partitions: Chunk[Partition]) =
Expand Down
12 changes: 4 additions & 8 deletions redis/src/main/scala/zio/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package zio.redis

import zio._
import zio.schema.codec.BinaryCodec

trait Redis
extends api.Connection
Expand All @@ -31,14 +30,11 @@ trait Redis
with api.SortedSets
with api.Streams
with api.Scripting
with api.Cluster {
def codec: BinaryCodec
def executor: RedisExecutor
}
with api.Cluster

final case class RedisLive(codec: BinaryCodec, executor: RedisExecutor) extends Redis
object Redis extends Redis

object RedisLive {
lazy val layer: URLayer[RedisExecutor with BinaryCodec, Redis] =
ZLayer.fromFunction(RedisLive.apply _)
lazy val layer: ULayer[Redis] =
ZLayer.succeed(Redis)
Comment on lines +38 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you may as well remove the RedisLive object and pack everything in Redis, e.g.:

object Redis {
  lazy val layer: ULayer[Redis] = ZLayer.succeed(new Redis)
}

In the future we may consider renaming it to live, perhaps it'll be more consistent with the ecosystem.

}
33 changes: 12 additions & 21 deletions redis/src/main/scala/zio/redis/RedisCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,22 @@ import zio._
import zio.redis.Input.{StringInput, Varargs}
import zio.schema.codec.BinaryCodec

final class RedisCommand[-In, +Out] private (
val name: String,
val input: Input[In],
val output: Output[Out],
val codec: BinaryCodec,
val executor: RedisExecutor
) {
final class RedisCommand[-In, +Out] private (val name: String, val input: Input[In], val output: Output[Out]) {

private[redis] def run(in: In): IO[RedisError, Out] =
executor
.execute(resp(in))
.flatMap[Any, Throwable, Out](out => ZIO.attempt(output.unsafeDecode(out)(codec)))
.refineToOrDie[RedisError]
private[redis] def run(in: In): ZIO[RedisEnvironment, RedisError, Out] =
for {
env <- ZIO.service[RedisEnvironment]
result <- env.executor
.execute(resp(in, env.codec))
.flatMap[Any, Throwable, Out](out => ZIO.attempt(output.unsafeDecode(out)(env.codec)))
.refineToOrDie[RedisError]
} yield result

private[redis] def resp(in: In): Chunk[RespValue.BulkString] =
private[redis] def resp(in: In, codec: BinaryCodec): Chunk[RespValue.BulkString] =
Varargs(StringInput).encode(name.split(" "))(codec) ++ input.encode(in)(codec)
}

object RedisCommand {
private[redis] def apply[In, Out](
name: String,
input: Input[In],
output: Output[Out],
codec: BinaryCodec,
executor: RedisExecutor
): RedisCommand[In, Out] =
new RedisCommand(name, input, output, codec, executor)
private[redis] def apply[In, Out](name: String, input: Input[In], output: Output[Out]): RedisCommand[In, Out] =
new RedisCommand(name, input, output)
}
13 changes: 10 additions & 3 deletions redis/src/main/scala/zio/redis/RedisEnvironment.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package zio.redis

import zio._
import zio.schema.codec.BinaryCodec

private[redis] trait RedisEnvironment {
def codec: BinaryCodec
def executor: RedisExecutor
final case class RedisEnvironment(codec: BinaryCodec, executor: RedisExecutor)

private[redis] object RedisEnvironment {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this should be private at all now that it's a part of all commands. In fact, I'm thinking if it would be wise to replace it with BinaryCodec with RedisExecutor.

lazy val layer = ZLayer {
for {
codec <- ZIO.service[BinaryCodec]
executor <- ZIO.service[RedisExecutor]
} yield new RedisEnvironment(codec, executor)
}
Comment on lines +9 to +14
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nitpick:

Suggested change
lazy val layer = ZLayer {
for {
codec <- ZIO.service[BinaryCodec]
executor <- ZIO.service[RedisExecutor]
} yield new RedisEnvironment(codec, executor)
}
lazy val layer =
ZLayer {
for {
codec <- ZIO.service[BinaryCodec]
executor <- ZIO.service[RedisExecutor]
} yield new RedisEnvironment(codec, executor)
}

}
10 changes: 5 additions & 5 deletions redis/src/main/scala/zio/redis/ResultBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package zio.redis

import zio.ZIO
import zio._
import zio.redis.ResultBuilder.NeedsReturnType
import zio.schema.Schema

Expand All @@ -33,18 +33,18 @@ object ResultBuilder {
final abstract class NeedsReturnType

trait ResultBuilder1[+F[_]] extends ResultBuilder {
def returning[R: Schema]: ZIO[Redis, RedisError, F[R]]
def returning[R: Schema]: ZIO[RedisEnvironment, RedisError, F[R]]
}

trait ResultBuilder2[+F[_, _]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema]: ZIO[Redis, RedisError, F[R1, R2]]
def returning[R1: Schema, R2: Schema]: ZIO[RedisEnvironment, RedisError, F[R1, R2]]
}

trait ResultBuilder3[+F[_, _, _]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema, R3: Schema]: ZIO[Redis, RedisError, F[R1, R2, R3]]
def returning[R1: Schema, R2: Schema, R3: Schema]: ZIO[RedisEnvironment, RedisError, F[R1, R2, R3]]
}

trait ResultOutputBuilder extends ResultBuilder {
def returning[R: Output]: ZIO[Redis, RedisError, R]
def returning[R: Output]: ZIO[RedisEnvironment, RedisError, R]
}
}
53 changes: 18 additions & 35 deletions redis/src/main/scala/zio/redis/api/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import zio.redis._
import zio.redis.api.Cluster.{AskingCommand, ClusterSetSlots, ClusterSlots}
import zio.redis.options.Cluster.SetSlotSubCommand._
import zio.redis.options.Cluster.{Partition, Slot}
import zio.schema.codec.BinaryCodec
import zio.{Chunk, IO}
import zio.{Chunk, ZIO}

trait Cluster extends RedisEnvironment {
trait Cluster {

/**
* When a cluster client receives an -ASK redirect, the ASKING command is sent to the target node followed by the
Expand All @@ -34,17 +33,17 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
def asking: IO[RedisError, Unit] =
AskingCommand(codec, executor).run(())
def asking: ZIO[RedisEnvironment, RedisError, Unit] =
AskingCommand.run(())

/**
* Returns details about which cluster slots map to which Redis instances.
*
* @return
* details about which cluster
*/
def slots: IO[RedisError, Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput), codec, executor)
def slots: ZIO[RedisEnvironment, RedisError, Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput))
command.run(())
}

Expand All @@ -56,9 +55,9 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
def setSlotStable(slot: Slot): IO[RedisError, Unit] = {
def setSlotStable(slot: Slot): ZIO[RedisEnvironment, RedisError, Unit] = {
val command =
RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryInput[String]()), UnitOutput, codec, executor)
RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryInput[String]()), UnitOutput)
command.run((slot.number, Stable.stringify))
}

Expand All @@ -73,14 +72,9 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
def setSlotMigrating(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()),
UnitOutput,
codec,
executor
)
def setSlotMigrating(slot: Slot, nodeId: String): ZIO[RedisEnvironment, RedisError, Unit] = {
val command =
RedisCommand(ClusterSetSlots, Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()), UnitOutput)
command.run((slot.number, Migrating.stringify, nodeId))
}

Expand All @@ -95,14 +89,9 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
def setSlotImporting(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()),
UnitOutput,
codec,
executor
)
def setSlotImporting(slot: Slot, nodeId: String): ZIO[RedisEnvironment, RedisError, Unit] = {
val command =
RedisCommand(ClusterSetSlots, Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()), UnitOutput)
command.run((slot.number, Importing.stringify, nodeId))
}

Expand All @@ -117,14 +106,9 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
def setSlotNode(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()),
UnitOutput,
codec,
executor
)
def setSlotNode(slot: Slot, nodeId: String): ZIO[RedisEnvironment, RedisError, Unit] = {
val command =
RedisCommand(ClusterSetSlots, Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()), UnitOutput)
command.run((slot.number, Node.stringify, nodeId))
}
}
Expand All @@ -134,6 +118,5 @@ private[redis] object Cluster {
final val ClusterSlots = "CLUSTER SLOTS"
final val ClusterSetSlots = "CLUSTER SETSLOT"

final val AskingCommand: (BinaryCodec, RedisExecutor) => RedisCommand[Unit, Unit] =
RedisCommand(Asking, NoInput, UnitOutput, _, _)
final val AskingCommand: RedisCommand[Unit, Unit] = RedisCommand(Asking, NoInput, UnitOutput)
}
Loading