Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service binding for mkCtx returning Resource #567

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

RyanJHeld
Copy link

This PR adds the method serviceBindingResource to a service implementation's generated companion. The method is just like serviceBinding, but the signature on mkCtx is Metadata => Resource[F, A] instead of Metadata => F[A].

The motivation here is to propagate spans for tracing. For example, the library Natchez allows a span to propagate from one machine to another by converting a span into a kernel, which is just a collection of http headers (see docs). On the server-side, the trace can be continued by extracting the kernel and creating a Resource[F, Span[F]]:

def mkCtx[F[_]](m: Metadata): Resource[F, Span[F]] = 
  for {
    ep <- entryPoint[F]
    kern = metadataToKernel(m)
    span <- ep.continue("span name", kern)
  } yield span

The "release" method for this resource is how the span gets sent back to the collector, so it's important that the resource isn't released until after the service call has completed (we can't just call _.use(_.pure[F])). Currently, the generated code for serviceBinding implements each service method as (r, m) => mkCtx(m).flatMap(serviceImpl.methodName(r, _)) (for non-streaming methods). This implementation doesn't allow a trace to be propagated without manually editing each service method. The generated serviceBindingResource (added in this PR) gives the alternate implementation (r, m) => mkCtx(m).use(serviceImpl.methodName(r, _)), allowing trace propagation without additional code.

val handler = s"$Fs2ServerCallHandler[F](dispatcher, serverOptions).${handleMethod(method)}[$inType, $outType]"

val serviceCall = s"serviceImpl.${method.name}"
val eval = if (method.isServerStreaming) s"$Stream.resource[F, A](mkCtx(m)).flatMap" else "mkCtx(m).use"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really the crux of the PR. I'd like to cut down on the code duplication I just added to this file, but wanted to get general feedback on the PR before spending any time on that.

f: Metadata => Resource[F, A],
serverOptions: ServerOptions
): Resource[F, ServerServiceDefinition] =
Dispatcher[F].map(serviceBindingResource[F, A](_, serviceImpl, f, serverOptions))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to add the error handling that the above method has due to the different signature.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about something like:

  final def service[F[_]: Async, A](
      dispatcher: Dispatcher[F],
      serviceImpl: Service[F, A],
      f: Metadata => F[A],
      serverOptions: ServerOptions
  ): ServerServiceDefinition = {

    val mkCtx: Metadata => F[A] = f(_).handleErrorWith {
      case e: StatusException => e.raiseError[F, A]
      case e: StatusRuntimeException => e.raiseError[F, A]
      case e: Throwable => Status.INTERNAL.withDescription(e.getMessage).asRuntimeException().raiseError[F, A]
    }

    val lifted: Metadata => Resource[F, A] = m => Resource.eval(mkCtx(m))

    serviceBindingResource[F, A](dispatcher, serviceImpl, lifted, serverOptions)
  }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are on 2.5.0-RC1 we can make a breaking change with respect to this method:

  protected def serviceBinding[F[_]: Async, A](
      dispatcher: Dispatcher[F],
      serviceImpl: Service[F, A],
      mkCtx: Metadata => F[A],
      serverOptions: ServerOptions
  ): ServerServiceDefinition

as all the other ones can be implemented in terms of:

  protected def serviceBinding[F[_]: Async, A](
    dispatcher: Dispatcher[F],
    serviceImpl: Service[F, A],
    mkCtx: Metadata => Resource[F, A],
    serverOptions: ServerOptions
): ServerServiceDefinition

Having the code gen generate only on method to implement that takes that resource thing. The adapter is:

val lifted: Metadata => Resource[F, A] = m => Resource.eval(mkCtx(m))

Copy link
Collaborator

@ahjohannessen ahjohannessen Sep 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, mkCtx: Metadata => Resource[F, A] is more general and not that much of a breaking change for users if we change all the signature to express the acquisition of A in terms of a Resource[F, A] rather than F.

@nadavwr
Copy link

nadavwr commented Jun 10, 2024

This is exactly the kind of change we need for tracing instrumentation. We use Kamon, but being able to surround a unary server call with a Resource[F, *] mkCtx is working well for us too. Just providing a mkCtx: Metadata => Resource[F, A] will be a lifesaver for us—it will enable the instrumentation we need without some truly horrible boilerplate we currently need.

Right now we have to use ResourceIO for our F, forcing users to add Resource-related boilerplate to every server-side method they implement. We also need to supply a specialized Dispatcher[ResourceIO] implementation that evaluates cleanups at end of runs (surprisingly Dispatcher.parallel[ResourceIO] will accumulate cleanups and evaluate them only during dispatcher shutdown).

What can I do to help move this along?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants