Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aspect oriented programming as middleware #668

Open
ValdemarGr opened this issue Sep 5, 2023 · 1 comment
Open

Aspect oriented programming as middleware #668

ValdemarGr opened this issue Sep 5, 2023 · 1 comment

Comments

@ValdemarGr
Copy link
Contributor

ValdemarGr commented Sep 5, 2023

Grpc has support for interceptors on both client and server.
Having such tools in this library could facilitate improvements in usage (for instance tracing and authorization).

The ideas proposed here would also cover other similar issues (#627, #6, #567).

I have taken inspiration from cats-tagless for the Dom and Cod typeclasses.
https://github.com/typelevel/cats-tagless/blob/master/core/src/main/scala/cats/tagless/aop/Aspect.scala

The following example covers the server-side of an implementation, but a client aspect is very similar.

If you are interested I could put together a PR for this?

runtime module

import cats._
import cats.implicits._

// shared between client and server
case class CallContext[Req, Res](
  metadata: Metadata,
  methodDescriptor: io.grpc.MethodDescriptor[Req, Res]
)
trait ServiceAspect[F[_], Dom[_], Cod[_], A] { self =>
  def visitUnaryToUnary[Req, Res](
    callCtx: CallContext[Req, Res],
    dom: Dom[Req],
    cod: Cod[Res],
    request: (Req, A) => F[Res]
  ): Req => F[Res]

  def visitUnaryToStreaming[Req, Res](
    callCtx: CallContext[Req, Res],
    dom: Dom[Req],
    cod: Cod[Res],
    request: (Req, A) => fs2.Stream[F, Res]
  ): Req => fs2.Stream[F, Res]

  def visitStreamingToUnary[Req, Res](
    callCtx: CallContext[Req, Res],
    dom: Dom[Req],
    cod: Cod[Res],
    request: (fs2.Stream[F, Req], A) => F[Res]
  ): fs2.Stream[F, Req] => F[Res]

  def visitStreamingToStreaming[Req, Res](
    callCtx: CallContext[Req, Res],
    dom: Dom[Req],
    cod: Cod[Res],
    request: (fs2.Stream[F, Req], A) => fs2.Stream[F, Res]
  ): fs2.Stream[F, Req] => fs2.Stream[F, Res]

  def modify[B](f: A => F[B])(implicit F: Monad[F]): ServiceAspect[F, Dom, Cod, B] =
    new ServiceAspect[F, Dom, Cod, B] {
      override def visitUnaryToUnary[Req, Res](
        callCtx: CallContext[Req, Res],
        dom: Dom[Req],
        cod: Cod[Res],
        request: (Req, B) => F[Res]
      ): Req => F[Res] =
        self.visitUnaryToUnary[Req, Res](
          callCtx,
          dom,
          cod,
          (req, a) => f(a).flatMap(request(req, _))
        )

      override def visitUnaryToStreaming[Req, Res](
        callCtx: CallContext[Req, Res],
        dom: Dom[Req],
        cod: Cod[Res],
        request: (Req, B) => Stream[F, Res]
      ): Req => Stream[F, Res] =
        self.visitUnaryToStreaming[Req, Res](
          callCtx,
          dom,
          cod,
          (req, a) => fs2.Stream.eval(f(a)).flatMap(request(req, _))
        )

      override def visitStreamingToUnary[Req, Res](
        callCtx: CallContext[Req, Res],
        dom: Dom[Req],
        cod: Cod[Res],
        request: (Stream[F, Req], B) => F[Res]
      ): Stream[F, Req] => F[Res] =
        self.visitStreamingToUnary[Req, Res](
          callCtx,
          dom,
          cod,
          (req, a) => f(a).flatMap(request(req, _))
        )

      override def visitStreamingToStreaming[Req, Res](
        callCtx: CallContext[Req, Res],
        dom: Dom[Req],
        cod: Cod[Res],
        request: (Stream[F, Req], B) => Stream[F, Res]
      ): Stream[F, Req] => Stream[F, Res] =
        self.visitStreamingToStreaming[Req, Res](
          callCtx,
          dom,
          cod,
          (req, a) => fs2.Stream.eval(f(a)).flatMap(request(req, _))
        )
    }
}

// https://github.com/typelevel/cats-tagless/blob/master/core/src/main/scala/cats/tagless/Trivial.scala
final class Trivial[A] private extends Serializable

object Trivial {
  private val any                      = new Trivial[Any]
  implicit def instance[A]: Trivial[A] = any.asInstanceOf[Trivial[A]]
}

object ServiceAspect {
  def default[F[_], Dom[_], Cod[_]] = new ServiceAspect[F, Dom, Cod, io.grpc.Metadata] {
    override def visitUnaryToUnary[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Req, Metadata) => F[Res]
    ): Req => F[Res] = request(_, callCtx.metadata)

    override def visitUnaryToStreaming[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Req, Metadata) => Stream[F, Res]
    ): Req => Stream[F, Res] = request(_, callCtx.metadata)

    override def visitStreamingToUnary[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Stream[F, Req], Metadata) => F[Res]
    ): Stream[F, Req] => F[Res] = request(_, callCtx.metadata)

    override def visitStreamingToStreaming[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Stream[F, Req], Metadata) => Stream[F, Res]
    ): Stream[F, Req] => Stream[F, Res] = request(_, callCtx.metadata)
  }
}

codegen

case class Request()
case class Response()
case class StreamingResponse()

trait SomeServiceFs2Grpc[F[_], A] {
  def doSomething(request: Request, ctx: A): F[Response]
  def doSomethingStream(request: Request, ctx: A): fs2.Stream[F, StreamingResponse]
}
object SomeServiceFs2Grpc {
  def serviceBinding[F[_]: Async, Dom[_], Cod[_], A](
    dispatcher: Dispatcher[F],
    impl: SomeServiceFs2Grpc[F, A],
    serverOptions: ServerOptions,
    aspect: ServiceAspect[F, Dom, Cod, A]
  )(
    implicit domRequest: Dom[Request],
    codResponse: Cod[Response],
    codStreamingResponse: Cod[StreamingResponse]
  ) =
    io.grpc.ServerServiceDefinition
      .builder(SomeService.SERVICE)
      .addMethod(
        SomeService.METHOD_DO_SOMETHING,
        fs2.grpc.server
          .Fs2ServerCallHandler[F](dispatcher, serverOptions)
          .unaryToUnaryCall[Request, Response] { (r, m) =>
            aspect
              .visitUnaryToUnary[Request, Response](
                CallContext(m, SomeService.METHOD_DO_SOMETHING),
                domRequest,
                codResponse,
                (r, a) => impl.doSomething(r, a)
              )
              .apply(r)
          }
      )
      .addMethod(
        SomeService.METHOD_DO_SOMETHING_STREAM,
        fs2.grpc.server
          .Fs2ServerCallHandler[F](dispatcher, serverOptions)
          .unaryToStreamingCall[Request, StreamingResponse] { (r, m) =>
            aspect
              .visitUnaryToStreaming[Request, StreamingResponse](
                CallContext(m, SomeService.METHOD_DO_SOMETHING_STREAM),
                domRequest,
                codStreamingResponse,
                (r, a) => impl.doSomethingStream(r, a)
              )
              .apply(r)
          }
      )
      .build()
}

object SomeService {
  val METHOD_DO_SOMETHING: io.grpc.MethodDescriptor[Request, Response] = ???

  val METHOD_DO_SOMETHING_STREAM: io.grpc.MethodDescriptor[Request, StreamingResponse] = ???

  val SERVICE: io.grpc.ServiceDescriptor = ???
}

userland

val mySimpleImpl: SomeServiceFs2Grpc[IO, Metadata] = ???

val disp: Dispatcher[IO] = ???
SomeServiceFs2Grpc.serviceBinding(
  disp,
  mySimpleImpl,
  ServerOptions.default,
  ServiceAspect.default[IO, Trivial, Trivial]
)

// more complex usecase
type Auth = String
val myAuthedImpl: SomeServiceFs2Grpc[IO, Auth]     = ???
def extractAuthFromMetadata(m: Metadata): IO[Auth] = ???

SomeServiceFs2Grpc.serviceBinding(
  disp,
  myAuthedImpl,
  ServerOptions.default,
  ServiceAspect.default[IO, Trivial, Trivial].modify(extractAuthFromMetadata)
)

import natchez._
trait TracingAspect[A] {
  def key: String
  def value: String
}

def traceAspect[F[_]: Applicative: Trace, Cod[_], A](underlying: ServiceAspect[F, TracingAspect, Cod, A]) = {
  type Dom[A] = TracingAspect[A]
  new ServiceAspect[F, Dom, Cod, A] {
    override def visitUnaryToUnary[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Req, A) => F[Res]
    ): Req => F[Res] = req =>
      Trace[F].span(callCtx.methodDescriptor.getFullMethodName()) {
        Trace[F].put(dom.key -> dom.value) *>
          underlying.visitUnaryToUnary[Req, Res](
            callCtx,
            dom,
            cod,
            (req, a) => request(req, a)
          )(req)
      }

    override def visitUnaryToStreaming[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Req, A) => Stream[F, Res]
    ): Req => Stream[F, Res] = ???

    override def visitStreamingToUnary[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Stream[F, Req], A) => F[Res]
    ): Stream[F, Req] => F[Res] = ???

    override def visitStreamingToStreaming[Req, Res](
      callCtx: CallContext[Req, Res],
      dom: Dom[Req],
      cod: Cod[Res],
      request: (Stream[F, Req], A) => Stream[F, Res]
    ): Stream[F, Req] => Stream[F, Res] = ???

  }
}
@ahjohannessen
Copy link
Collaborator

If you are interested I could put together a PR for this?

I think this looks promising. You are welcome to put together a PR and we can take it from there :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants