Skip to content

Commit

Permalink
Zipkin trace propagation plugin (linkerd#2071)
Browse files Browse the repository at this point in the history
A trace propagation plugin that uses the Zipkin B3 headers to continue an existing trace context in case it exists.

Signed-off-by: Jose Maria Alvarez <josemaria.alvarezfernandez@elcorteingles.es>
  • Loading branch information
jmalvarezf-lmes authored and adleong committed Jul 23, 2018
1 parent 437e2f5 commit dcf89d6
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.buoyant.linkerd.protocol.h2.LinkerdTracePropagatorInitializer
io.buoyant.linkerd.protocol.h2.ZipkinTracePropagatorInitializer
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.buoyant.linkerd.protocol.h2

import java.util.Base64

import com.twitter.finagle.Stack
import com.twitter.finagle.buoyant.Sampler
import com.twitter.finagle.buoyant.h2.{Headers, LinkerdHeaders, Request}
import com.twitter.finagle.http.util.StringUtil
import com.twitter.finagle.tracing.{Flags, SpanId, TraceId}
import com.twitter.util.Try
import io.buoyant.linkerd.{TracePropagator, TracePropagatorInitializer}

class ZipkinTracePropagatorInitializer extends TracePropagatorInitializer {
override val configClass = classOf[ZipkinTracePropagatorConfig]
override val configId = "io.l5d.zipkin"
}

case class ZipkinTracePropagatorConfig() extends H2TracePropagatorConfig {
override def mk(params: Stack.Params): TracePropagator[Request] = new ZipkinTracePropagator
}

class ZipkinTracePropagator extends TracePropagator[Request] {
/**
* Read the trace id from the request, if it has one.
*/
override def traceId(req: Request): Option[TraceId] = {
var traceId = ZipkinTrace.get(req.headers)
LinkerdHeaders.Ctx.Trace.clear(req.headers)
traceId
}

/**
* Return a sampler which decides if the given request should be sampled, based on properties
* of the request (zipkin or linkerd if zipkin not present). If None is returned, the decision of whether to sample the request is deferred
* to the tracer.
*/
override def sampler(req: Request): Option[Sampler] = {
var sampler = ZipkinTrace.getSampler(req.headers).map(Sampler(_))
LinkerdHeaders.Sample.clear(req.headers)
sampler
}

/**
* Write the trace id onto a request.
*/
override def setContext(
req: Request,
traceId: TraceId
): Unit = {
ZipkinTrace.set(req.headers, traceId)
}
}

object ZipkinTrace {

val ZipkinSpanHeader = "x-b3-spanid"
val ZipkinParentHeader = "x-b3-parentspanid"
val ZipkinTraceHeader = "x-b3-traceid"
val ZipkinSampleHeader = "x-b3-sampled"
val ZipkinFlagsHeader = "x-b3-flags"

def get(headers: Headers): Option[TraceId] = {
val trace = caseInsensitiveGet(headers, ZipkinTraceHeader).flatMap(SpanId.fromString)
val parent = caseInsensitiveGet(headers, ZipkinParentHeader).flatMap(SpanId.fromString)
val span = caseInsensitiveGet(headers, ZipkinSpanHeader).flatMap(SpanId.fromString)
val sample = caseInsensitiveGet(headers, ZipkinSampleHeader).map(StringUtil.toBoolean)
val flags = caseInsensitiveGet(headers, ZipkinFlagsHeader).map(StringUtil.toSomeLong) match {
case Some(f) => Flags(f)
case None => Flags()
}
span.map { s =>
TraceId(trace, parent, s, sample, flags)
}
}

def set(headers: Headers, id: TraceId): Unit = {
val _ = headers.set(ZipkinSpanHeader, id.spanId.toString)
val __ = headers.set(ZipkinTraceHeader, id.traceId.toString)
val ___ = headers.set(ZipkinParentHeader, id.parentId.toString)
val ____ = headers.set(ZipkinSampleHeader, (if ((id.sampled exists { _ == true })) 1 else 0).toString)
val _____ = headers.set(ZipkinFlagsHeader, id.flags.toLong.toString)
}

def getSampler(headers: Headers): Option[Float] =
headers.get(ZipkinSampleHeader).flatMap { s =>
Try(s.toFloat).toOption.map {
case v if v < 0 => 0.0f
case v if v > 1 => 1.0f
case v => v
}
}

private def caseInsensitiveGet(headers: Headers, key: String): Option[String] =
headers.toSeq.iterator.collectFirst { case (k, v) if key.equalsIgnoreCase(k) => v }
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.buoyant.linkerd.protocol.LinkerdTracePropagatorInitializer
io.buoyant.linkerd.protocol.ZipkinTracePropagatorInitializer
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.buoyant.linkerd.protocol

import java.util.Base64

import com.twitter.finagle.Stack
import com.twitter.finagle.buoyant.Sampler
import com.twitter.finagle.buoyant.linkerd.Headers
import com.twitter.finagle.http.util.StringUtil
import com.twitter.finagle.http.{HeaderMap, Request}
import com.twitter.finagle.tracing.{Flags, SpanId, TraceId}
import com.twitter.util.Try
import io.buoyant.linkerd.{TracePropagator, TracePropagatorInitializer}

class ZipkinTracePropagatorInitializer extends TracePropagatorInitializer {
override val configClass = classOf[ZipkinTracePropagatorConfig]
override val configId = "io.l5d.zipkin"
}

case class ZipkinTracePropagatorConfig() extends HttpTracePropagatorConfig {
override def mk(params: Stack.Params): TracePropagator[Request] = new ZipkinTracePropagator
}

class ZipkinTracePropagator extends TracePropagator[Request] {
/**
* Read the trace id from the request, if it has one.
*/
override def traceId(req: Request): Option[TraceId] = {
var traceId = ZipkinTrace.get(req.headerMap)
Headers.Ctx.Trace.clear(req.headerMap)
traceId
}

/**
* Return a sampler which decides if the given request should be sampled, based on properties
* of the request (zipkin or linkerd if zipkin not present). If None is returned, the decision of whether to sample the request is deferred
* to the tracer.
*/
override def sampler(req: Request): Option[Sampler] = {
var sampler = ZipkinTrace.getSampler(req.headerMap).map(Sampler(_))
Headers.Sample.clear(req.headerMap)
sampler
}

/**
* Write the trace id onto a request.
*/
override def setContext(
req: Request,
traceId: TraceId
): Unit = {
ZipkinTrace.set(req.headerMap, traceId)
}

}

object ZipkinTrace {

val ZipkinSpanHeader = "x-b3-spanid"
val ZipkinParentHeader = "x-b3-parentspanid"
val ZipkinTraceHeader = "x-b3-traceid"
val ZipkinSampleHeader = "x-b3-sampled"
val ZipkinFlagsHeader = "x-b3-flags"

def get(headers: HeaderMap): Option[TraceId] = {
val trace = caseInsensitiveGet(headers, ZipkinTraceHeader).flatMap(SpanId.fromString)
val parent = caseInsensitiveGet(headers, ZipkinParentHeader).flatMap(SpanId.fromString)
val span = caseInsensitiveGet(headers, ZipkinSpanHeader).flatMap(SpanId.fromString)
val sample = caseInsensitiveGet(headers, ZipkinSampleHeader).map(StringUtil.toBoolean)
val flags = caseInsensitiveGet(headers, ZipkinFlagsHeader).map(StringUtil.toSomeLong) match {
case Some(f) => Flags(f)
case None => Flags()
}
span.map { s =>
TraceId(trace, parent, s, sample, flags)
}
}

def set(headers: HeaderMap, id: TraceId): Unit = {
val _ = headers.set(ZipkinSpanHeader, id.spanId.toString)
val __ = headers.set(ZipkinTraceHeader, id.traceId.toString)
val ___ = headers.set(ZipkinParentHeader, id.parentId.toString)
val ____ = headers.set(ZipkinSampleHeader, (if ((id.sampled exists { _ == true })) 1 else 0).toString)
val _____ = headers.set(ZipkinFlagsHeader, id.flags.toLong.toString)
}

def getSampler(headers: HeaderMap): Option[Float] =
headers.get(ZipkinSampleHeader).flatMap { s =>
Try(s.toFloat).toOption.map {
case v if v < 0 => 0.0f
case v if v > 1 => 1.0f
case v => v
}
}

private def caseInsensitiveGet(headers: HeaderMap, key: String): Option[String] =
headers.iterator.collectFirst { case (k, v) if key.equalsIgnoreCase(k) => v }
}

0 comments on commit dcf89d6

Please sign in to comment.