Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add manual implementation of APIv3 HTTP endpoints #5054

Merged
merged 16 commits into from
Dec 30, 2023
10 changes: 9 additions & 1 deletion cmd/query/app/apiv3/grpc_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ import (
)

// RegisterGRPCGateway registers api_v3 endpoints into provided mux.
func RegisterGRPCGateway(ctx context.Context, logger *zap.Logger, r *mux.Router, basePath string, grpcEndpoint string, grpcTLS *tlscfg.Options, tm *tenancy.Manager) error {
func RegisterGRPCGateway(
ctx context.Context,
logger *zap.Logger,
r *mux.Router,
basePath string,
grpcEndpoint string,
grpcTLS *tlscfg.Options,
tm *tenancy.Manager,
) error {
grpcEndpoint = netutils.FixLocalhost([]string{grpcEndpoint})[0]
jsonpb := &runtime.JSONPb{}

Expand Down
47 changes: 25 additions & 22 deletions cmd/query/app/apiv3/grpc_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -60,6 +61,9 @@ const (
// REGENERATE_SNAPSHOTS=true go test -v ./cmd/query/app/apiv3/...
var regenerateSnapshots = os.Getenv("REGENERATE_SNAPSHOTS") == "true"

// The tests in http_gateway_test.go set this to true to use manual gateway implementation.
var useHTTPGateway = false
Copy link
Member Author

@yurishkuro yurishkuro Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not the cleanest way, but hopefully we'll just delete grpc-gateway based implementation soon and simplify/consolidate the tests


type testGateway struct {
reader *spanstoremocks.Reader
url string
Expand All @@ -76,6 +80,9 @@ func setupGRPCGateway(
serverTLS, clientTLS *tlscfg.Options,
tenancyOptions tenancy.Options,
) *testGateway {
if useHTTPGateway {
return setupHTTPGateway(t, basePath, serverTLS, clientTLS, tenancyOptions)
}
gw := &testGateway{
reader: &spanstoremocks.Reader{},
}
Expand Down Expand Up @@ -159,11 +166,12 @@ func (gw *testGateway) execRequest(t *testing.T, gwReq *gatewayRequest) ([]byte,
func verifySnapshot(t *testing.T, body []byte) []byte {
// reformat JSON body with indentation, to make diffing easier
var data interface{}
require.NoError(t, json.Unmarshal(body, &data))
require.NoError(t, json.Unmarshal(body, &data), "response: %s", string(body))
body, err := json.MarshalIndent(data, "", " ")
require.NoError(t, err)

snapshotFile := filepath.Join(snapshotLocation, strings.ReplaceAll(t.Name(), "/", "_")+".json")
testName := path.Base(t.Name())
snapshotFile := filepath.Join(snapshotLocation, testName+".json")
if regenerateSnapshots {
os.WriteFile(snapshotFile, body, 0o644)
}
Expand All @@ -177,17 +185,6 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) {
require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj))
}

func parseChunkResponse(t *testing.T, body []byte, obj gogoproto.Message) {
// Unwrap the 'result' container generated by the gateway.
// See https://github.com/grpc-ecosystem/grpc-gateway/issues/2189
type resultWrapper struct {
Result json.RawMessage `json:"result"`
Copy link
Member Author

@yurishkuro yurishkuro Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid this ugliness use the new helper types from IDL and marshal in one go.

}
var result resultWrapper
require.NoError(t, json.Unmarshal(body, &result))
parseResponse(t, result.Result, obj)
}

func makeTestTrace() (*model.Trace, model.TraceID) {
traceID := model.NewTraceID(150, 160)
return &model.Trace{
Expand Down Expand Up @@ -282,11 +279,13 @@ func runGatewayGetTrace(t *testing.T, gw *testGateway, setupRequest func(*http.R
require.Equal(t, http.StatusOK, statusCode, "response=%s", string(body))
body = verifySnapshot(t, body)

var spansResponse api_v3.SpansResponseChunk
parseChunkResponse(t, body, &spansResponse)
var response api_v3.GRPCGatewayWrapper
parseResponse(t, body, &response)

assert.Len(t, spansResponse.GetResourceSpans(), 1)
assert.Equal(t, bytesOfTraceID(t, traceID.High, traceID.Low), spansResponse.GetResourceSpans()[0].GetScopeSpans()[0].GetSpans()[0].GetTraceId())
assert.Len(t, response.Result.ResourceSpans, 1)
assert.Equal(t,
bytesOfTraceID(t, traceID.High, traceID.Low),
response.Result.ResourceSpans[0].ScopeSpans[0].Spans[0].TraceId)
}

func runGatewayFindTraces(t *testing.T, gw *testGateway, setupRequest func(*http.Request)) {
Expand All @@ -307,11 +306,13 @@ func runGatewayFindTraces(t *testing.T, gw *testGateway, setupRequest func(*http
require.Equal(t, http.StatusOK, statusCode, "response=%s", string(body))
body = verifySnapshot(t, body)

var spansResponse api_v3.SpansResponseChunk
parseChunkResponse(t, body, &spansResponse)
var response api_v3.GRPCGatewayWrapper
parseResponse(t, body, &response)

assert.Len(t, spansResponse.GetResourceSpans(), 1)
assert.Equal(t, bytesOfTraceID(t, traceID.High, traceID.Low), spansResponse.GetResourceSpans()[0].GetScopeSpans()[0].GetSpans()[0].GetTraceId())
assert.Len(t, response.Result.ResourceSpans, 1)
assert.Equal(t,
bytesOfTraceID(t, traceID.High, traceID.Low),
response.Result.ResourceSpans[0].ScopeSpans[0].Spans[0].TraceId)
}

func bytesOfTraceID(t *testing.T, high, low uint64) []byte {
Expand Down Expand Up @@ -382,8 +383,10 @@ func TestGRPCGatewayTenancyRejection(t *testing.T) {
// We don't set tenant header
response, err := http.DefaultClient.Do(req)
require.NoError(t, err)
body, err := io.ReadAll(response.Body)
require.NoError(t, err)
require.NoError(t, response.Body.Close())
require.Equal(t, http.StatusForbidden, response.StatusCode)
require.Equal(t, http.StatusUnauthorized, response.StatusCode, "response=%s", string(body))

// Try again with tenant header set
tm := tenancy.NewManager(&tenancyOptions)
Expand Down
225 changes: 225 additions & 0 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright (c) 2023 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package apiv3

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
traceIDParam = "traceID"

routeGetTrace = "/api/v3/traces/{traceID}"
routeFindTraces = "/api/v3/traces"
routeGetServices = "/api/v3/services"
routeGetOperations = "/api/v3/operations"
)

// HTTPGateway exposes APIv3 HTTP endpoints.
type HTTPGateway struct {
QueryService *querysvc.QueryService
TenancyMgr *tenancy.Manager
Logger *zap.Logger
Tracer *jtracer.JTracer
}

// RegisterRoutes registers HTTP endpoints for APIv3 into provided mux.
// The called can create a subrouter if it needs to prepend a base path.
func (h *HTTPGateway) RegisterRoutes(router *mux.Router) {
h.addRoute(router, h.getTrace, routeGetTrace).Methods(http.MethodGet)
h.addRoute(router, h.findTraces, routeFindTraces).Methods(http.MethodGet)
h.addRoute(router, h.getServices, routeGetServices).Methods(http.MethodGet)
h.addRoute(router, h.getOperations, routeGetOperations).Methods(http.MethodGet)
}

// addRoute adds a new endpoint to the router with given path and handler function.
// This code is mostly copied from ../http_handler.
func (h *HTTPGateway) addRoute(
router *mux.Router,
f func(http.ResponseWriter, *http.Request),
route string,
args ...interface{},
) *mux.Route {
var handler http.Handler = http.HandlerFunc(f)
if h.TenancyMgr.Enabled {
handler = tenancy.ExtractTenantHTTPHandler(h.TenancyMgr, handler)
}
traceMiddleware := otelhttp.NewHandler(
otelhttp.WithRouteTag(route, handler),
route,
otelhttp.WithTracerProvider(h.Tracer.OTEL))
return router.HandleFunc(route, traceMiddleware.ServeHTTP)
}

// tryHandleError checks if the passed error is not nil and handles it by writing
// an error response to the client. Otherwise it returns false.
func (h *HTTPGateway) tryHandleError(w http.ResponseWriter, err error, statusCode int) bool {
if err == nil {
return false
}
if errors.Is(err, spanstore.ErrTraceNotFound) {
statusCode = http.StatusNotFound
}
if statusCode == http.StatusInternalServerError {
h.Logger.Error("HTTP handler, Internal Server Error", zap.Error(err))
}
errorResponse := api_v3.GRPCGatewayError{
Error: &api_v3.GRPCGatewayError_GRPCGatewayErrorDetails{
HttpCode: int32(statusCode),
Message: err.Error(),
},
}
resp, _ := json.Marshal(&errorResponse)
http.Error(w, string(resp), statusCode)
return true

Check warning on line 93 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L79-L93

Added lines #L79 - L93 were not covered by tests
}

func (h *HTTPGateway) returnSpans(spans []*model.Span, w http.ResponseWriter) {
resourceSpans, err := modelToOTLP(spans)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}

Check warning on line 100 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L99-L100

Added lines #L99 - L100 were not covered by tests
for _, rs := range resourceSpans {
for _, ss := range rs.ScopeSpans {
for _, s := range ss.Spans {
if len(s.ParentSpanId) == 0 {
// If ParentSpanId is empty array then gogo/jsonpb renders it as empty string.
// To match the output with grpc-gateway we set it to nil and it won't be included.
s.ParentSpanId = nil
}
}
}
}
response := &api_v3.GRPCGatewayWrapper{
Result: &api_v3.SpansResponseChunk{
ResourceSpans: resourceSpans,
},
}

h.marshalResponse(response, w)
}

func (h *HTTPGateway) marshalResponse(response proto.Message, w http.ResponseWriter) {
_ = new(jsonpb.Marshaler).Marshal(w, response)
}

func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
traceIDVar := vars[traceIDParam]
traceID, err := model.TraceIDFromString(traceIDVar)
if h.tryHandleError(w, err, http.StatusBadRequest) {
return
}

Check warning on line 131 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L130-L131

Added lines #L130 - L131 were not covered by tests
trace, err := h.QueryService.GetTrace(r.Context(), traceID)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}

Check warning on line 135 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L134-L135

Added lines #L134 - L135 were not covered by tests
h.returnSpans(trace.Spans, w)
}

func (h *HTTPGateway) findTraces(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
queryParams := &spanstore.TraceQueryParameters{
ServiceName: query.Get("query.service_name"),
OperationName: query.Get("query.operation_name"),
Tags: nil, // most curiously not supported by grpc-gateway
}
if n := query.Get("query.num_traces"); n != "" {
numTraces, err := strconv.Atoi(n)
if h.tryHandleError(w, err, http.StatusBadRequest) {
return
}
queryParams.NumTraces = numTraces

Check warning on line 151 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L147-L151

Added lines #L147 - L151 were not covered by tests
}
timeMin := query.Get("query.start_time_min")
timeMax := query.Get("query.start_time_max")
if timeMin == "" || timeMax == "" {
err := fmt.Errorf("query.start_time_min and query.start_time_max are required")
h.tryHandleError(w, err, http.StatusBadRequest)
return
}

Check warning on line 159 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L156-L159

Added lines #L156 - L159 were not covered by tests
timeMinParsed, err := time.Parse(time.RFC3339Nano, timeMin)
if h.tryHandleError(w, err, http.StatusBadRequest) {
return
}

Check warning on line 163 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L162-L163

Added lines #L162 - L163 were not covered by tests
queryParams.StartTimeMin = timeMinParsed
timeMaxParsed, err := time.Parse(time.RFC3339Nano, timeMax)
if h.tryHandleError(w, err, http.StatusBadRequest) {
return
}

Check warning on line 168 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L167-L168

Added lines #L167 - L168 were not covered by tests
queryParams.StartTimeMax = timeMaxParsed
if d := query.Get("duration_min"); d != "" {
dur, err := time.ParseDuration(d)
if h.tryHandleError(w, err, http.StatusBadRequest) {
return
}
queryParams.DurationMin = dur

Check warning on line 175 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L171-L175

Added lines #L171 - L175 were not covered by tests
}
if d := query.Get("duration_max"); d != "" {
dur, err := time.ParseDuration(d)
if h.tryHandleError(w, err, http.StatusBadRequest) {
return
}
queryParams.DurationMax = dur

Check warning on line 182 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L178-L182

Added lines #L178 - L182 were not covered by tests
}

traces, err := h.QueryService.FindTraces(r.Context(), queryParams)
// TODO how do we distinguish internal error from bad parameters for FindTrace?
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}

Check warning on line 189 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L188-L189

Added lines #L188 - L189 were not covered by tests
var spans []*model.Span
for _, trace := range traces {
spans = append(spans, trace.Spans...)
}
h.returnSpans(spans, w)
}

func (h *HTTPGateway) getServices(w http.ResponseWriter, r *http.Request) {
services, err := h.QueryService.GetServices(r.Context())
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}

Check warning on line 201 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L200-L201

Added lines #L200 - L201 were not covered by tests
h.marshalResponse(&api_v3.GetServicesResponse{
Services: services,
}, w)
}

func (h *HTTPGateway) getOperations(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
queryParams := spanstore.OperationQueryParameters{
ServiceName: query.Get("service"),
SpanKind: query.Get("span_kind"),
}
operations, err := h.QueryService.GetOperations(r.Context(), queryParams)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}

Check warning on line 216 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L215-L216

Added lines #L215 - L216 were not covered by tests
apiOperations := make([]*api_v3.Operation, len(operations))
for i := range operations {
apiOperations[i] = &api_v3.Operation{
Name: operations[i].Name,
SpanKind: operations[i].SpanKind,
}
}
h.marshalResponse(&api_v3.GetOperationsResponse{Operations: apiOperations}, w)
}
Loading
Loading