-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add service binding for mkCtx returning Resource #567
base: main
Are you sure you want to change the base?
Conversation
val handler = s"$Fs2ServerCallHandler[F](dispatcher, serverOptions).${handleMethod(method)}[$inType, $outType]" | ||
|
||
val serviceCall = s"serviceImpl.${method.name}" | ||
val eval = if (method.isServerStreaming) s"$Stream.resource[F, A](mkCtx(m)).flatMap" else "mkCtx(m).use" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really the crux of the PR. I'd like to cut down on the code duplication I just added to this file, but wanted to get general feedback on the PR before spending any time on that.
f: Metadata => Resource[F, A], | ||
serverOptions: ServerOptions | ||
): Resource[F, ServerServiceDefinition] = | ||
Dispatcher[F].map(serviceBindingResource[F, A](_, serviceImpl, f, serverOptions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't able to add the error handling that the above method has due to the different signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about something like:
final def service[F[_]: Async, A](
dispatcher: Dispatcher[F],
serviceImpl: Service[F, A],
f: Metadata => F[A],
serverOptions: ServerOptions
): ServerServiceDefinition = {
val mkCtx: Metadata => F[A] = f(_).handleErrorWith {
case e: StatusException => e.raiseError[F, A]
case e: StatusRuntimeException => e.raiseError[F, A]
case e: Throwable => Status.INTERNAL.withDescription(e.getMessage).asRuntimeException().raiseError[F, A]
}
val lifted: Metadata => Resource[F, A] = m => Resource.eval(mkCtx(m))
serviceBindingResource[F, A](dispatcher, serviceImpl, lifted, serverOptions)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are on 2.5.0-RC1 we can make a breaking change with respect to this method:
protected def serviceBinding[F[_]: Async, A](
dispatcher: Dispatcher[F],
serviceImpl: Service[F, A],
mkCtx: Metadata => F[A],
serverOptions: ServerOptions
): ServerServiceDefinition
as all the other ones can be implemented in terms of:
protected def serviceBinding[F[_]: Async, A](
dispatcher: Dispatcher[F],
serviceImpl: Service[F, A],
mkCtx: Metadata => Resource[F, A],
serverOptions: ServerOptions
): ServerServiceDefinition
Having the code gen generate only on method to implement that takes that resource thing. The adapter is:
val lifted: Metadata => Resource[F, A] = m => Resource.eval(mkCtx(m))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Furthermore, mkCtx: Metadata => Resource[F, A]
is more general and not that much of a breaking change for users if we change all the signature to express the acquisition of A
in terms of a Resource[F, A]
rather than F
.
This is exactly the kind of change we need for tracing instrumentation. We use Kamon, but being able to surround a unary server call with a Right now we have to use What can I do to help move this along? |
This PR adds the method
serviceBindingResource
to a service implementation's generated companion. The method is just likeserviceBinding
, but the signature onmkCtx
isMetadata => Resource[F, A]
instead ofMetadata => F[A]
.The motivation here is to propagate spans for tracing. For example, the library
Natchez
allows a span to propagate from one machine to another by converting a span into a kernel, which is just a collection of http headers (see docs). On the server-side, the trace can be continued by extracting the kernel and creating aResource[F, Span[F]]
:The "release" method for this resource is how the span gets sent back to the collector, so it's important that the resource isn't released until after the service call has completed (we can't just call
_.use(_.pure[F])
). Currently, the generated code forserviceBinding
implements each service method as(r, m) => mkCtx(m).flatMap(serviceImpl.methodName(r, _))
(for non-streaming methods). This implementation doesn't allow a trace to be propagated without manually editing each service method. The generatedserviceBindingResource
(added in this PR) gives the alternate implementation(r, m) => mkCtx(m).use(serviceImpl.methodName(r, _))
, allowing trace propagation without additional code.