Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[548] Expose Redis operations via Redis trait #727

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions example/src/main/scala/example/contributorsCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ final case class ContributorsCacheLive(r: Redis, s: Sttp) extends ContributorsCa

private def read(repository: Repository): ZIO[Redis, ApiError, Contributors] =
ZIO
.serviceWithZIO[Redis](
_.get(repository.key)
.returning[String]
)
.serviceWithZIO[Redis](_.get(repository.key).returning[String])
.someOrFail(ApiError.CacheMiss(repository.key))
.map(_.fromJson[Contributors])
.foldZIO(_ => ZIO.fail(ApiError.CorruptedData), s => ZIO.succeed(s.getOrElse(Contributors(Chunk.empty))))
Expand Down
2 changes: 1 addition & 1 deletion redis/src/main/scala/zio/redis/ClusterExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ final case class ClusterExecutor(
def executeAsk(address: RedisUri) =
for {
executor <- executor(address)
_ <- executor.execute(AskingCommand.resp((), StringUtf8Codec))
_ <- executor.execute(AskingCommand(this, StringUtf8Codec).resp(()))
res <- executor.execute(command)
} yield res

Expand Down
9 changes: 0 additions & 9 deletions redis/src/main/scala/zio/redis/CommandExecutor.scala

This file was deleted.

26 changes: 12 additions & 14 deletions redis/src/main/scala/zio/redis/RedisCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,25 @@ import zio._
import zio.redis.Input.{StringInput, Varargs}
import zio.schema.codec.BinaryCodec

final class RedisCommand[-In, +Out] private (val name: String, val input: Input[In], val output: Output[Out]) {
private[redis] def run(in: In): ZIO[Redis, RedisError, Out] =
ZIO
.serviceWithZIO[Redis] { redis =>
redis.executor
.execute(resp(in, redis.codec))
.flatMap[Any, Throwable, Out](out => ZIO.attempt(output.unsafeDecode(out)(redis.codec)))
}
.refineToOrDie[RedisError]
final class RedisCommand[-In, +Out] private (val name: String, val input: Input[In], val output: Output[Out])(
val executor: RedisExecutor,
val codec: BinaryCodec
) {

private[redis] def run(in: In)(implicit executor: RedisExecutor, codec: BinaryCodec): ZIO[Any, RedisError, Out] =
private[redis] def run(in: In): IO[RedisError, Out] =
executor
.execute(resp(in, codec))
.execute(resp(in))
.flatMap[Any, Throwable, Out](out => ZIO.attempt(output.unsafeDecode(out)(codec)))
.refineToOrDie[RedisError]

private[redis] def resp(in: In, codec: BinaryCodec): Chunk[RespValue.BulkString] =
private[redis] def resp(in: In): Chunk[RespValue.BulkString] =
Varargs(StringInput).encode(name.split(" "))(codec) ++ input.encode(in)(codec)
}

object RedisCommand {
private[redis] def apply[In, Out](name: String, input: Input[In], output: Output[Out]): RedisCommand[In, Out] =
new RedisCommand(name, input, output)
private[redis] def apply[In, Out](name: String, input: Input[In], output: Output[Out])(
executor: RedisExecutor,
codec: BinaryCodec
Copy link
Member

@mijicd mijicd Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this parameter group. Let's merge them in the first one (sorry 🙏 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

): RedisCommand[In, Out] =
new RedisCommand(name, input, output)(executor, codec)
}
8 changes: 8 additions & 0 deletions redis/src/main/scala/zio/redis/RedisEnvironment.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package zio.redis

import zio.schema.codec.BinaryCodec

trait RedisEnvironment {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make it package-private. After all, it is an implementation detail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, makes sense

def codec: BinaryCodec
def executor: RedisExecutor
}
33 changes: 18 additions & 15 deletions redis/src/main/scala/zio/redis/api/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import zio.redis._
import zio.redis.api.Cluster.{AskingCommand, ClusterSetSlots, ClusterSlots}
import zio.redis.options.Cluster.SetSlotSubCommand._
import zio.redis.options.Cluster.{Partition, Slot}
import zio.{Chunk, ZIO}
import zio.schema.codec.BinaryCodec
import zio.{Chunk, IO}

trait Cluster extends CommandExecutor {
trait Cluster extends RedisEnvironment {

/**
* When a cluster client receives an -ASK redirect, the ASKING command is sent to the target node followed by the
Expand All @@ -33,17 +34,17 @@ trait Cluster extends CommandExecutor {
* @return
* the Unit value.
*/
def asking: ZIO[Any, RedisError, Unit] =
AskingCommand.run(())
def asking: IO[RedisError, Unit] =
AskingCommand(executor, codec).run(())

/**
* Returns details about which cluster slots map to which Redis instances.
*
* @return
* details about which cluster
*/
def slots: ZIO[Any, RedisError, Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput))
def slots: IO[RedisError, Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput))(executor, codec)
command.run(())
}

Expand All @@ -55,8 +56,9 @@ trait Cluster extends CommandExecutor {
* @return
* the Unit value.
*/
def setSlotStable(slot: Slot): ZIO[Any, RedisError, Unit] = {
val command = RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryInput[String]()), UnitOutput)
def setSlotStable(slot: Slot): IO[RedisError, Unit] = {
val command =
RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryInput[String]()), UnitOutput)(executor, codec)
command.run((slot.number, Stable.stringify))
}

Expand All @@ -71,12 +73,12 @@ trait Cluster extends CommandExecutor {
* @return
* the Unit value.
*/
def setSlotMigrating(slot: Slot, nodeId: String): ZIO[Any, RedisError, Unit] = {
def setSlotMigrating(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()),
UnitOutput
)
)(executor, codec)
command.run((slot.number, Migrating.stringify, nodeId))
}

Expand All @@ -91,12 +93,12 @@ trait Cluster extends CommandExecutor {
* @return
* the Unit value.
*/
def setSlotImporting(slot: Slot, nodeId: String): ZIO[Any, RedisError, Unit] = {
def setSlotImporting(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()),
UnitOutput
)
)(executor, codec)
command.run((slot.number, Importing.stringify, nodeId))
}

Expand All @@ -111,12 +113,12 @@ trait Cluster extends CommandExecutor {
* @return
* the Unit value.
*/
def setSlotNode(slot: Slot, nodeId: String): ZIO[Any, RedisError, Unit] = {
def setSlotNode(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryInput[String](), ArbitraryInput[String]()),
UnitOutput
)
)(executor, codec)
command.run((slot.number, Node.stringify, nodeId))
}
}
Expand All @@ -126,5 +128,6 @@ private[redis] object Cluster {
final val ClusterSlots = "CLUSTER SLOTS"
final val ClusterSetSlots = "CLUSTER SETSLOT"

final val AskingCommand: RedisCommand[Unit, Unit] = RedisCommand(Asking, NoInput, UnitOutput)
final val AskingCommand: (RedisExecutor, BinaryCodec) => RedisCommand[Unit, Unit] =
RedisCommand(Asking, NoInput, UnitOutput)(_, _)
}
Loading