Skip to content

Commit

Permalink
Collect Zipkin v2 json
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Nov 8, 2017
1 parent 2b73fe9 commit dcdba0e
Show file tree
Hide file tree
Showing 20 changed files with 1,697 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "jaeger-ui"]
path = jaeger-ui
url = https://github.com/uber/jaeger-ui
[submodule "zipkin-api"]
path = zipkin-api
url = git@github.com:openzipkin/zipkin-api.git
12 changes: 11 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ PROJECT_ROOT=github.com/uber/jaeger
TOP_PKGS := $(shell glide novendor | grep -v -e ./thrift-gen/... -e ./examples/... -e ./scripts/...)

# all .go files that don't exist in hidden directories
ALL_SRC := $(shell find . -name "*.go" | grep -v -e vendor -e thrift-gen \
ALL_SRC := $(shell find . -name "*.go" | grep -v -e vendor -e thrift-gen -e swagger-gen \
-e ".*/\..*" \
-e ".*/_.*" \
-e ".*/mocks.*")
Expand All @@ -28,6 +28,11 @@ THRIFT_GO_ARGS=thrift_import="github.com/apache/thrift/lib/go/thrift"
THRIFT_GEN=$(shell which thrift-gen)
THRIFT_GEN_DIR=thrift-gen

SWAGGER_IMG_VER=0.12.0
SWAGGER_IMAGE=quay.io/goswagger/swagger:$(SWAGGER_IMG_VER)
SWAGGER=docker run --rm -it -u ${shell id -u} -v "${PWD}:/go/src/${PROJECT_ROOT}" -w /go/src/${PROJECT_ROOT} $(SWAGGER_IMAGE)
SWAGGER_GEN_DIR=swagger-gen

PASS=$(shell printf "\033[32mPASS\033[0m")
FAIL=$(shell printf "\033[31mFAIL\033[0m")
COLORIZE=$(SED) ''/PASS/s//$(PASS)/'' | $(SED) ''/FAIL/s//$(FAIL)/''
Expand Down Expand Up @@ -209,6 +214,11 @@ idl-submodule:
git submodule init
git submodule update

.PHONY: generate-zipkin-swagger
generate-zipkin-swagger:
$(SWAGGER) generate server -f ./zipkin-api/zipkin2-api.yaml -t $(SWAGGER_GEN_DIR) -O PostSpans --exclude-main
rm $(SWAGGER_GEN_DIR)/restapi/operations/post_spans_urlbuilder.go $(SWAGGER_GEN_DIR)/restapi/server.go $(SWAGGER_GEN_DIR)/restapi/configure_zipkin.go $(SWAGGER_GEN_DIR)/models/trace.go $(SWAGGER_GEN_DIR)/models/list_of_traces.go $(SWAGGER_GEN_DIR)/models/dependency_link.go

.PHONY: thrift-image
thrift-image:
$(THRIFT) -version
Expand Down
68 changes: 61 additions & 7 deletions cmd/collector/app/zipkin/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,43 @@ import (
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/go-openapi/loads"
"github.com/go-openapi/swag"
"github.com/gorilla/mux"
"github.com/pkg/errors"
tchanThrift "github.com/uber/tchannel-go/thrift"

"github.com/uber/jaeger/cmd/collector/app"
"github.com/uber/jaeger/swagger-gen/models"
"github.com/uber/jaeger/swagger-gen/restapi"
"github.com/uber/jaeger/swagger-gen/restapi/operations"
"github.com/uber/jaeger/thrift-gen/zipkincore"
)

// APIHandler handles all HTTP calls to the collector
type APIHandler struct {
zipkinSpansHandler app.ZipkinSpansHandler
zipkinV2API *operations.ZipkinAPI
}

// NewAPIHandler returns a new APIHandler
func NewAPIHandler(
zipkinSpansHandler app.ZipkinSpansHandler,
) *APIHandler {
) (*APIHandler, error) {
swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "")
if err != nil {
return nil, errors.Wrapf(err, "Failed to create zipkin swagger")
}
return &APIHandler{
zipkinSpansHandler: zipkinSpansHandler,
}
zipkinV2API: operations.NewZipkinAPI(swaggerSpec),
}, nil
}

// RegisterRoutes registers Zipkin routes
func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/spans", aH.saveSpans).Methods(http.MethodPost)
router.HandleFunc("/api/v2/spans", aH.saveSpansV2).Methods(http.MethodPost)
}

func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -84,15 +97,56 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
return
}

if err := aH.saveThriftSpans(tSpans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusAccepted)
}

func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) {
bRead := r.Body
defer r.Body.Close()

bodyBytes, err := ioutil.ReadAll(bRead)
if err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusInternalServerError)
return
}

var spans *models.ListOfSpans = &models.ListOfSpans{}
if err = swag.ReadJSON(bodyBytes, spans); err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}
if err = spans.Validate(aH.zipkinV2API.Formats()); err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}

tSpans, err := spansV2ToThrift(spans)
if err != nil {
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}

if err := aH.saveThriftSpans(tSpans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
return
}

w.WriteHeader(operations.PostSpansAcceptedCode)
}

func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error {
if len(tSpans) > 0 {
ctx, _ := tchanThrift.NewContext(time.Minute)
if _, err = aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
return
if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil {
return err
}
}

w.WriteHeader(http.StatusAccepted)
return nil
}

func deserializeThrift(b []byte) ([]*zipkincore.Span, error) {
Expand Down
5 changes: 3 additions & 2 deletions cmd/collector/app/zipkin/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (p *mockZipkinHandler) getSpans() []*zipkincore.Span {

func initializeTestServer(err error) (*httptest.Server, *APIHandler) {
r := mux.NewRouter()
handler := NewAPIHandler(&mockZipkinHandler{err: err})
handler, _ := NewAPIHandler(&mockZipkinHandler{err: err})
handler.RegisterRoutes(r)
return httptest.NewServer(r), handler
}
Expand Down Expand Up @@ -224,7 +224,8 @@ func TestDeserializeWithBadListStart(t *testing.T) {
}

func TestCannotReadBodyFromRequest(t *testing.T) {
handler := NewAPIHandler(&mockZipkinHandler{})
handler, err := NewAPIHandler(&mockZipkinHandler{})
require.NoError(t, err)
req, err := http.NewRequest(http.MethodPost, "whatever", &errReader{})
assert.NoError(t, err)
rw := dummyResponseWriter{}
Expand Down
99 changes: 99 additions & 0 deletions cmd/collector/app/zipkin/jsonv2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) 2017 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin

import (
"github.com/uber/jaeger/model"
"github.com/uber/jaeger/swagger-gen/models"
"github.com/uber/jaeger/thrift-gen/zipkincore"
)

func spansV2ToThrift(spans *models.ListOfSpans) ([]*zipkincore.Span, error) {
var tSpans []*zipkincore.Span
for _, span := range *spans {
tSpan, err := spanV2ToThrift(*span)
if err != nil {
return nil, err
}
tSpans = append(tSpans, tSpan)
}
return tSpans, nil

return nil, nil
}

func spanV2ToThrift(s models.Span) (*zipkincore.Span, error) {
id, err := model.SpanIDFromString(cutLongID(*s.ID))
if err != nil {
return nil, err
}
traceID, err := model.TraceIDFromString(*s.TraceID)
if err != nil {
return nil, err
}

tSpan := &zipkincore.Span{
ID: int64(id),
TraceID: int64(traceID.Low),
Name: s.Name,
Debug: s.Debug,
Timestamp: &s.Timestamp,
Duration: &s.Duration,
}

if len(s.ParentID) > 0 {
parentID, err := model.SpanIDFromString(cutLongID(s.ParentID))
if err != nil {
return nil, err
}
signed := int64(parentID)
tSpan.ParentID = &signed
}

for _, a := range s.Annotations {
tA, err := annToThrift(a)
if err != nil {
return nil, err
}
tSpan.Annotations = append(tSpan.Annotations, tA)
}

for k, v := range s.Tags {
ba := &zipkincore.BinaryAnnotation{
Key: k,
Value: []byte(v),
AnnotationType: zipkincore.AnnotationType_STRING,
// TODO endpoint
}
tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, ba)
}

// TODO
//s.Kind
//s.LocalEndpoint
//s.RemoteEndpoint

return tSpan, nil
}

func annToThrift(a *models.Annotation) (*zipkincore.Annotation, error) {
ta := &zipkincore.Annotation{
Value: a.Value,
Timestamp: a.Timestamp,
// TODO host - endpoint
}

return ta, nil
}
7 changes: 6 additions & 1 deletion cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,13 @@ func startZipkinHTTPAPI(
recoveryHandler func(http.Handler) http.Handler,
) {
if zipkinPort != 0 {
zHandler, err := zipkin.NewAPIHandler(zipkinSpansHandler)
if err != nil {
logger.Fatal("Failed to initialize Zipkin handler", zap.Error(err))
}
r := mux.NewRouter()
zipkin.NewAPIHandler(zipkinSpansHandler).RegisterRoutes(r)
zHandler.RegisterRoutes(r)

httpPortStr := ":" + strconv.Itoa(zipkinPort)
logger.Info("Listening for Zipkin HTTP traffic", zap.Int("zipkin.http-port", zipkinPort))

Expand Down
15 changes: 13 additions & 2 deletions crossdock/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ services:
- java
- python
- zipkin-brave-json
- zipkin-brave-json-v2
- zipkin-brave-thrift

environment:
- WAIT_FOR=test_driver,go,node,java,python,zipkin-brave-thrift,zipkin-brave-json
- WAIT_FOR=test_driver,go,node,java,python,zipkin-brave-thrift,zipkin-brave-json,zipkin-brave-json-v2
- WAIT_FOR_TIMEOUT=60s

- CALL_TIMEOUT=60s

- AXIS_CLIENT=test_driver
- AXIS_SERVICES=go,node,java,python,zipkin-brave-json,zipkin-brave-thrift
- AXIS_SERVICES=go,node,java,python,zipkin-brave-json,zipkin-brave-json-v2,zipkin-brave-thrift

- BEHAVIOR_ENDTOEND=client,services

Expand All @@ -37,6 +38,8 @@ services:
image: jaegertracing/xdock-java
ports:
- "8080-8082"
depends_on:
- jaeger-agent

python:
image: jaegertracing/xdock-py
Expand All @@ -50,6 +53,14 @@ services:
environment:
- ENCODING=JSON

zipkin-brave-json-v2:
image: jaegertracing/xdock-zipkin-brave
ports:
- "8080-8081"
environment:
- ENCODING=JSON
- JSON_ENCODER=JSON_V2

zipkin-brave-thrift:
image: jaegertracing/xdock-zipkin-brave
ports:
Expand Down
2 changes: 1 addition & 1 deletion scripts/updateLicenses.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

set -e

python scripts/updateLicense.py $(git ls-files "*\.go" | grep -v -e thrift-gen)
python scripts/updateLicense.py $(git ls-files "*\.go" | grep -v -e thrift-gen -e swagger-gen)
Loading

0 comments on commit dcdba0e

Please sign in to comment.