Skip to content

Commit

Permalink
Implement federation tracing (ghostdogpr#701)
Browse files Browse the repository at this point in the history
* Add federation tracing

* Implement apollo federation tracing

* Refactor federation tracing switch. Add support to all adapters

* Address review comments
  • Loading branch information
paulpdaniels authored Jan 10, 2021
1 parent c60b1c9 commit 6340345
Show file tree
Hide file tree
Showing 13 changed files with 787 additions and 47 deletions.
11 changes: 9 additions & 2 deletions adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package caliban

import akka.http.scaladsl.model.MediaTypes.`application/json`
import akka.http.scaladsl.model.ws.{ Message, TextMessage }
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws.{ Message, TextMessage }
import akka.http.scaladsl.server.Directives.{ complete, extractRequestContext }
import akka.http.scaladsl.server.{ RequestContext, Route }
import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller
Expand Down Expand Up @@ -67,6 +67,13 @@ trait AkkaHttpAdapter {
)
.map(gqlResult => HttpResponse(StatusCodes.OK, entity = HttpEntity(`application/json`, gqlResult)))

private def supportFederatedTracing(context: RequestContext, request: GraphQLRequest): GraphQLRequest =
if (context.request.headers.exists(h =>
h.is(GraphQLRequest.`apollo-federation-include-trace`) && h.value() == GraphQLRequest.ftv1
)) {
request.withFederatedTracing
} else request

def completeRequest[R, E](
interpreter: GraphQLInterpreter[R, E],
skipValidation: Boolean = false,
Expand All @@ -81,7 +88,7 @@ trait AkkaHttpAdapter {
contextWrapper(ctx) {
executeHttpResponse(
interpreter,
request,
supportFederatedTracing(ctx, request),
skipValidation = skipValidation,
enableIntrospection = enableIntrospection,
queryExecution = queryExecution
Expand Down
48 changes: 27 additions & 21 deletions adapters/finch/src/main/scala/caliban/FinchAdapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import caliban.Value.NullValue
import caliban.execution.QueryExecution
import io.circe.Decoder.Result
import io.circe.Json
import io.circe.syntax._
import io.circe.parser._
import io.circe.syntax._
import io.finch._
import shapeless._
import zio.interop.catz._
Expand Down Expand Up @@ -93,26 +93,32 @@ object FinchAdapter extends Endpoint.Module[Task] {
enableIntrospection: Boolean = true,
queryExecution: QueryExecution = QueryExecution.Parallel
)(implicit runtime: Runtime[R]): Endpoint[Task, Json :+: Json :+: CNil] =
post(queryParams :: stringBodyOption :: header("content-type")) {
(queryRequest: GraphQLRequest, body: Option[String], contentType: String) =>
val queryTask = (queryRequest, body, contentType) match {
case (_, Some(bodyValue), "application/json") =>
Task.fromEither(parse(bodyValue).flatMap(_.as[GraphQLRequest]))
case (_, Some(_), "application/graphql") =>
Task(GraphQLRequest(body))
case (queryRequest, _, _) if queryRequest.query.isDefined =>
Task(queryRequest)
// treat unmatched content-type as same as None of body.
case _ =>
Task.fail(new Exception("Query was not found"))
}
runtime
.unsafeRunToFuture(
queryTask
.flatMap(createRequest(_, interpreter, skipValidation, enableIntrospection, queryExecution))
.catchAll(error => Task(Ok(GraphQLResponse(NullValue, List(error.getMessage)).asJson)))
)
.future
post(
queryParams :: stringBodyOption :: header("content-type") :: headerOption(
GraphQLRequest.`apollo-federation-include-trace`
)
) { (queryRequest: GraphQLRequest, body: Option[String], contentType: String, federatedTracing: Option[String]) =>
val queryTask = (queryRequest, body, contentType) match {
case (_, Some(bodyValue), "application/json") =>
Task.fromEither(parse(bodyValue).flatMap(_.as[GraphQLRequest]))
case (_, Some(_), "application/graphql") =>
Task(GraphQLRequest(body))
case (queryRequest, _, _) if queryRequest.query.isDefined =>
Task(queryRequest)
// treat unmatched content-type as same as None of body.
case _ =>
Task.fail(new Exception("Query was not found"))
}
runtime
.unsafeRunToFuture(
queryTask.map { query =>
if (federatedTracing.contains(GraphQLRequest.ftv1))
query.withFederatedTracing
else query
}.flatMap(createRequest(_, interpreter, skipValidation, enableIntrospection, queryExecution))
.catchAll(error => Task(Ok(GraphQLResponse(NullValue, List(error.getMessage)).asJson)))
)
.future
} :+: get(queryParams) { request: GraphQLRequest =>
executeRequest(
request,
Expand Down
5 changes: 4 additions & 1 deletion adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@ object Http4sAdapter {
} yield parsed
else
req.attemptAs[GraphQLRequest].value.absolve
queryWithTracing = req.headers
.find(r => r.name == GraphQLRequest.`apollo-federation-include-trace` && r.value == GraphQLRequest.ftv1)
.foldLeft(query)((q, _) => q.withFederatedTracing)
result <- executeToJson(
interpreter,
query,
queryWithTracing,
skipValidation = skipValidation,
enableIntrospection = enableIntrospection,
queryExecution
Expand Down
14 changes: 10 additions & 4 deletions adapters/play/src/main/scala/caliban/PlayAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package caliban

import java.util.Locale
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try
import akka.stream.scaladsl.{ Flow, Sink, Source, SourceQueueWithComplete }
import akka.stream.{ Materializer, OverflowStrategy, QueueOfferResult }
import caliban.PlayAdapter.RequestWrapper
Expand All @@ -23,6 +20,10 @@ import zio.duration.Duration
import zio.random.Random
import zio.{ random, CancelableFuture, Fiber, Has, IO, RIO, Ref, Runtime, Schedule, Task, URIO, ZIO, ZLayer }

import java.util.Locale
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

trait PlayAdapter[R <: Has[_] with Blocking with Random] {

val `application/graphql` = "application/graphql"
Expand Down Expand Up @@ -140,6 +141,11 @@ trait PlayAdapter[R <: Has[_] with Blocking with Random] {
)
)

private def supportFederatedTracing(request: Request[GraphQLRequest]): Request[GraphQLRequest] =
if (request.headers.get(GraphQLRequest.`apollo-federation-include-trace`).contains(GraphQLRequest.ftv1)) {
request.map(_.withFederatedTracing)
} else request

private def executeRequest[E](
interpreter: GraphQLInterpreter[R, E],
request: Request[GraphQLRequest],
Expand All @@ -152,7 +158,7 @@ trait PlayAdapter[R <: Has[_] with Blocking with Random] {
requestWrapper(request)(
interpreter
.executeRequest(
request.body,
supportFederatedTracing(request).body,
skipValidation = skipValidation,
enableIntrospection = enableIntrospection,
queryExecution
Expand Down
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ lazy val federation = project
"dev.zio" %% "zio-test-sbt" % zioVersion % Test,
compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
),
PB.targets in Compile := Seq(
scalapb.gen(grpc = false) -> (sourceManaged in Compile).value / "scalapb"
),
libraryDependencies ++= Seq(
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
),
scalacOptions += "-Ywarn-unused:-locals"
)

Expand Down
15 changes: 14 additions & 1 deletion core/src/main/scala/caliban/GraphQLRequest.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package caliban

import caliban.GraphQLRequest.{ `apollo-federation-include-trace`, ftv1 }
import caliban.Value.StringValue
import caliban.interop.circe.IsCirceDecoder
import caliban.interop.play.IsPlayJsonReads
import caliban.interop.zio.IsZIOJsonDecoder
Expand All @@ -12,7 +14,15 @@ case class GraphQLRequest(
operationName: Option[String] = None,
variables: Option[Map[String, InputValue]] = None,
extensions: Option[Map[String, InputValue]] = None
)
) {

def withExtension(key: String, value: InputValue): GraphQLRequest =
copy(extensions = Some(extensions.foldLeft(Map(key -> value))(_ ++ _)))

def withFederatedTracing =
withExtension(`apollo-federation-include-trace`, StringValue(ftv1))

}

object GraphQLRequest {
implicit def circeDecoder[F[_]: IsCirceDecoder]: F[GraphQLRequest] =
Expand All @@ -21,4 +31,7 @@ object GraphQLRequest {
caliban.interop.play.json.GraphQLRequestPlayJson.graphQLRequestReads.asInstanceOf[F[GraphQLRequest]]
implicit def zioJsonDecoder[F[_]: IsZIOJsonDecoder]: F[GraphQLRequest] =
caliban.interop.zio.GraphQLRequestZioJson.graphQLRequestDecoder.asInstanceOf[F[GraphQLRequest]]

private[caliban] val ftv1 = "ftv1"
private[caliban] val `apollo-federation-include-trace` = "apollo-federation-include-trace"
}
4 changes: 2 additions & 2 deletions examples/src/main/resources/gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
"keywords": [],
"license": "MIT",
"dependencies": {
"apollo-server": "~2.9.15",
"@apollo/gateway": "~0.11.6",
"apollo-server": "~2.19.1",
"@apollo/gateway": "~0.21.4",
"@apollo/federation": "latest"
}
}
25 changes: 9 additions & 16 deletions examples/src/main/scala/caliban/federation/FederatedApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package caliban.federation
import caliban.GraphQL.graphQL
import caliban.federation.CharacterService.CharacterService
import caliban.federation.EpisodeService.EpisodeService
import caliban.schema.Annotations.{ GQLDeprecated, GQLDescription }
import caliban.schema.{ ArgBuilder, GenericSchema, Schema }
import caliban.wrappers.ApolloTracing._
import caliban.wrappers.Wrappers.{ maxDepth, maxFields, printSlowQueries, timeout }
import caliban.{ GraphQL, RootResolver }
import caliban.federation.tracing.ApolloFederatedTracing
import caliban.schema.Annotations.{GQLDeprecated, GQLDescription}
import caliban.schema.{ArgBuilder, GenericSchema, Schema}
import caliban.wrappers.Wrappers.{maxDepth, maxFields, printSlowQueries, timeout}
import caliban.{GraphQL, RootResolver}
import zio.URIO
import zio.clock.Clock
import zio.console.Console
import zio.duration._
import zio.stream.ZStream
import zio.query.ZQuery
import zio.stream.ZStream

import scala.language.postfixOps

Expand All @@ -23,17 +23,10 @@ object FederatedApi {
maxDepth(30) |+| // query analyzer that limit query depth
timeout(3 seconds) |+| // wrapper that fails slow queries
printSlowQueries(500 millis) |+| // wrapper that logs slow queries
apolloTracing // wrapper for https://github.com/apollographql/apollo-tracing
ApolloFederatedTracing.wrapper // wrapper for https://github.com/apollographql/apollo-tracing

object Characters extends GenericSchema[CharacterService] {
import caliban.federation.FederationData.characters.{
Character,
CharacterArgs,
CharactersArgs,
Episode,
EpisodeArgs,
Role
}
import caliban.federation.FederationData.characters.{Character, CharacterArgs, CharactersArgs, Episode, EpisodeArgs, Role}

case class Queries(
@GQLDescription("Return all characters from a given origin")
Expand Down Expand Up @@ -81,7 +74,7 @@ object FederatedApi {
}

object Episodes extends GenericSchema[EpisodeService] {
import caliban.federation.FederationData.episodes.{ Episode, EpisodeArgs, EpisodesArgs }
import caliban.federation.FederationData.episodes.{Episode, EpisodeArgs, EpisodesArgs}

case class Queries(
episode: EpisodeArgs => URIO[EpisodeService, Option[Episode]],
Expand Down
Loading

0 comments on commit 6340345

Please sign in to comment.