Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async command evaluation #917

Merged
merged 24 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions modules/redis/src/main/scala/zio/redis/GenRedis.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package zio.redis
import zio.{IO, UIO}

private[redis] trait GenRedis[G[+_]]
extends api.Connection[G]
with api.Geo[G]
with api.Hashes[G]
with api.HyperLogLog[G]
with api.Keys[G]
with api.Lists[G]
with api.Sets[G]
with api.Strings[G]
with api.SortedSets[G]
with api.Streams[G]
with api.Scripting[G]
with api.Cluster[G]
with api.Publishing[G]

private[redis] object GenRedis {
type Async[+A] = UIO[IO[RedisError, A]]
type Sync[+A] = IO[RedisError, A]

def async[A](io: UIO[IO[RedisError, A]]) = io
def sync[A](io: UIO[IO[RedisError, A]]) = io.flatten
}
39 changes: 16 additions & 23 deletions modules/redis/src/main/scala/zio/redis/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,31 @@ package zio.redis
import zio._
import zio.redis.internal._

trait Redis
extends api.Connection
with api.Geo
with api.Hashes
with api.HyperLogLog
with api.Keys
with api.Lists
with api.Sets
with api.Strings
with api.SortedSets
with api.Streams
with api.Scripting
with api.Cluster
with api.Publishing

object Redis {
lazy val cluster: ZLayer[CodecSupplier & RedisClusterConfig, RedisError, Redis] =
ClusterExecutor.layer >>> makeLayer
ZLayer.makeSome[CodecSupplier & RedisClusterConfig, Redis](ClusterExecutor.layer, makeLayer)

lazy val local: ZLayer[CodecSupplier, RedisError.IOError, Redis] =
lazy val local: ZLayer[CodecSupplier, RedisError.IOError, Redis & AsyncRedis] =
SingleNodeExecutor.local >>> makeLayer

lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, Redis] =
lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, Redis & AsyncRedis] =
SingleNodeExecutor.layer >>> makeLayer

private def makeLayer: URLayer[CodecSupplier & RedisExecutor, Redis] =
ZLayer {
private def makeLayer: URLayer[CodecSupplier & RedisExecutor, AsyncRedis & Redis] =
ZLayer.fromZIOEnvironment {
for {
codecSupplier <- ZIO.service[CodecSupplier]
executor <- ZIO.service[RedisExecutor]
} yield new Live(codecSupplier, executor)
} yield ZEnvironment[AsyncRedis, Redis](
new AsyncLive(codecSupplier, executor),
new SyncLive(codecSupplier, executor)
)
}

private final class Live(val codecSupplier: CodecSupplier, val executor: RedisExecutor) extends Redis
private final class SyncLive(val codecSupplier: CodecSupplier, val executor: RedisExecutor) extends Redis {
protected def lift[A](in: UIO[IO[RedisError, A]]): GenRedis.Sync[A] = GenRedis.sync(in)
}

private final class AsyncLive(val codecSupplier: CodecSupplier, val executor: RedisExecutor) extends AsyncRedis {
protected def lift[A](in: UIO[IO[RedisError, A]]): GenRedis.Async[A] = GenRedis.async(in)
}
}
16 changes: 8 additions & 8 deletions modules/redis/src/main/scala/zio/redis/ResultBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ object ResultBuilder {
@annotation.implicitNotFound("Use `returning[A]` to specify method's return type")
final abstract class NeedsReturnType

trait ResultBuilder1[+F[_]] extends ResultBuilder {
def returning[R: Schema]: IO[RedisError, F[R]]
trait ResultBuilder1[+F[_], G[+_]] extends ResultBuilder {
def returning[R: Schema]: G[F[R]]
}

trait ResultBuilder2[+F[_, _]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema]: IO[RedisError, F[R1, R2]]
trait ResultBuilder2[+F[_, _], G[+_]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema]: G[F[R1, R2]]
}

trait ResultBuilder3[+F[_, _, _]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema, R3: Schema]: IO[RedisError, F[R1, R2, R3]]
trait ResultBuilder3[+F[_, _, _], G[+_]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema, R3: Schema]: G[F[R1, R2, R3]]
}

trait ResultOutputBuilder extends ResultBuilder {
def returning[R: Output]: IO[RedisError, R]
trait ResultOutputBuilder[G[+_]] extends ResultBuilder {
def returning[R: Output]: G[R]
}

trait ResultStreamBuilder1[+F[_]] extends ResultBuilder {
Expand Down
40 changes: 18 additions & 22 deletions modules/redis/src/main/scala/zio/redis/api/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@

package zio.redis.api

import zio.Chunk
import zio.redis.Input._
import zio.redis.Output.{ChunkOutput, ClusterPartitionOutput, UnitOutput}
import zio.redis._
import zio.redis.api.Cluster.{AskingCommand, ClusterSetSlots, ClusterSlots}
import zio.redis.internal.{RedisCommand, RedisEnvironment, RedisExecutor}
import zio.redis.api.Cluster.{ClusterSetSlots, ClusterSlots, askingCommand}
import zio.redis.internal.{RedisCommand, RedisEnvironment}
import zio.redis.options.Cluster.SetSlotSubCommand._
import zio.redis.options.Cluster.{Partition, Slot}
import zio.{Chunk, IO}

trait Cluster extends RedisEnvironment {
trait Cluster[G[+_]] extends RedisEnvironment[G] {

/**
* When a cluster client receives an -ASK redirect, the ASKING command is sent to the target node followed by the
Expand All @@ -34,8 +33,8 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def asking: IO[RedisError, Unit] =
AskingCommand(executor).run(())
final def asking: G[Unit] =
askingCommand.run(())

/**
* Set a hash slot in importing state. Command should be executed on the node where hash slot will be migrated
Expand All @@ -48,12 +47,11 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotImporting(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
final def setSlotImporting(slot: Slot, nodeId: String): G[Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
executor
UnitOutput
)
command.run((slot.number, Importing.asString, nodeId))
}
Expand All @@ -69,12 +67,11 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotMigrating(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
final def setSlotMigrating(slot: Slot, nodeId: String): G[Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
executor
UnitOutput
)
command.run((slot.number, Migrating.asString, nodeId))
}
Expand All @@ -90,12 +87,11 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotNode(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
final def setSlotNode(slot: Slot, nodeId: String): G[Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
executor
UnitOutput
)
command.run((slot.number, Node.asString, nodeId))
}
Expand All @@ -108,9 +104,9 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotStable(slot: Slot): IO[RedisError, Unit] = {
final def setSlotStable(slot: Slot): G[Unit] = {
val command =
RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryValueInput[String]()), UnitOutput, executor)
RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryValueInput[String]()), UnitOutput)
command.run((slot.number, Stable.asString))
}

Expand All @@ -120,8 +116,8 @@ trait Cluster extends RedisEnvironment {
* @return
* details about which cluster
*/
final def slots: IO[RedisError, Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput), executor)
final def slots: G[Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput))
command.run(())
}
}
Expand All @@ -131,6 +127,6 @@ private[redis] object Cluster {
final val ClusterSetSlots = "CLUSTER SETSLOT"
final val ClusterSlots = "CLUSTER SLOTS"

final val AskingCommand: RedisExecutor => RedisCommand[Unit, Unit] =
(executor: RedisExecutor) => RedisCommand(Asking, NoInput, UnitOutput, executor)
final val askingCommand: RedisCommand[Unit, Unit] =
RedisCommand(Asking, NoInput, UnitOutput)
}
27 changes: 13 additions & 14 deletions modules/redis/src/main/scala/zio/redis/api/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

package zio.redis.api

import zio._
import zio.redis.Input._
import zio.redis.Output._
import zio.redis._
import zio.redis.internal.{RedisCommand, RedisEnvironment}

trait Connection extends RedisEnvironment {
trait Connection[G[+_]] extends RedisEnvironment[G] {
import Connection.{Auth => _, _}

/**
Expand All @@ -37,8 +36,8 @@ trait Connection extends RedisEnvironment {
* if the password provided via AUTH matches the password in the configuration file, the Unit value is returned and
* the server starts accepting commands. Otherwise, an error is returned and the client needs to try a new password.
*/
final def auth(password: String): IO[RedisError, Unit] = {
val command = RedisCommand(Connection.Auth, AuthInput, UnitOutput, executor)
final def auth(password: String): G[Unit] = {
val command = RedisCommand(Connection.Auth, AuthInput, UnitOutput)

command.run(Auth(None, password))
}
Expand All @@ -54,8 +53,8 @@ trait Connection extends RedisEnvironment {
* if the password provided via AUTH matches the password in the configuration file, the Unit value is returned and
* the server starts accepting commands. Otherwise, an error is returned and the client needs to try a new password.
*/
final def auth(username: String, password: String): IO[RedisError, Unit] = {
val command = RedisCommand(Connection.Auth, AuthInput, UnitOutput, executor)
final def auth(username: String, password: String): G[Unit] = {
val command = RedisCommand(Connection.Auth, AuthInput, UnitOutput)

command.run(Auth(Some(username), password))
}
Expand All @@ -66,8 +65,8 @@ trait Connection extends RedisEnvironment {
* @return
* the connection name, or None if a name wasn't set.
*/
final def clientGetName: IO[RedisError, Option[String]] = {
val command = RedisCommand(ClientGetName, NoInput, OptionalOutput(MultiStringOutput), executor)
final def clientGetName: G[Option[String]] = {
val command = RedisCommand(ClientGetName, NoInput, OptionalOutput(MultiStringOutput))

command.run(())
}
Expand All @@ -82,8 +81,8 @@ trait Connection extends RedisEnvironment {
* @return
* the ID of the current connection.
*/
final def clientId: IO[RedisError, Long] = {
val command = RedisCommand(ClientId, NoInput, LongOutput, executor)
final def clientId: G[Long] = {
val command = RedisCommand(ClientId, NoInput, LongOutput)

command.run(())
}
Expand All @@ -96,8 +95,8 @@ trait Connection extends RedisEnvironment {
* @return
* the Unit value.
*/
final def clientSetName(name: String): IO[RedisError, Unit] = {
val command = RedisCommand(ClientSetName, StringInput, UnitOutput, executor)
final def clientSetName(name: String): G[Unit] = {
val command = RedisCommand(ClientSetName, StringInput, UnitOutput)

command.run(name)
}
Expand All @@ -112,8 +111,8 @@ trait Connection extends RedisEnvironment {
* @return
* the Unit value.
*/
final def select(index: Long): IO[RedisError, Unit] = {
val command = RedisCommand(Select, LongInput, UnitOutput, executor)
final def select(index: Long): G[Unit] = {
val command = RedisCommand(Select, LongInput, UnitOutput)

command.run(index)
}
Expand Down
Loading