Skip to content

Commit

Permalink
Add manual implementation of APIv3 HTTP endpoints (#5054)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #5052

## Description of the changes
- Pull in IDL change
jaegertracing/jaeger-idl#102
- Re-implement APIv3 HTTP endpoints without the use of grpc-gateway
- Share tests from grpc-gateway for manual implementation
  - Refactor tests to avoid duplication of snapshots
- Fix inconsistency between http and grpc tenancy interceptors where
HTTP was returning Unauthenticated in certain cases, but GRPC was always
returning Forbidden. Make them consistent: missing tenant header results
in Unauthenticated.

## Follow-ups
- [x] http implementation needs more unit tests (mostly error handling
and parameter variations)
- [ ] the new implementation is not hooked up into production code yet,
I first want to confirm it works with model-v2, and just in general
minimize the scope of a single PR

## How was this change tested?
- Using unit tests added in #5051, with additional enhancements

---------

Signed-off-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
yurishkuro authored Dec 30, 2023
1 parent 5082239 commit 718f666
Show file tree
Hide file tree
Showing 22 changed files with 790 additions and 212 deletions.
10 changes: 9 additions & 1 deletion cmd/query/app/apiv3/grpc_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ import (
)

// RegisterGRPCGateway registers api_v3 endpoints into provided mux.
func RegisterGRPCGateway(ctx context.Context, logger *zap.Logger, r *mux.Router, basePath string, grpcEndpoint string, grpcTLS *tlscfg.Options, tm *tenancy.Manager) error {
func RegisterGRPCGateway(
ctx context.Context,
logger *zap.Logger,
r *mux.Router,
basePath string,
grpcEndpoint string,
grpcTLS *tlscfg.Options,
tm *tenancy.Manager,
) error {
grpcEndpoint = netutils.FixLocalhost([]string{grpcEndpoint})[0]
jsonpb := &runtime.JSONPb{}

Expand Down
60 changes: 28 additions & 32 deletions cmd/query/app/apiv3/grpc_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

gogojsonpb "github.com/gogo/protobuf/jsonpb"
gogoproto "github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -60,9 +58,13 @@ const (
// REGENERATE_SNAPSHOTS=true go test -v ./cmd/query/app/apiv3/...
var regenerateSnapshots = os.Getenv("REGENERATE_SNAPSHOTS") == "true"

// The tests in http_gateway_test.go set this to true to use manual gateway implementation.
var useHTTPGateway = false

type testGateway struct {
reader *spanstoremocks.Reader
url string
router *mux.Router
}

type gatewayRequest struct {
Expand All @@ -76,6 +78,9 @@ func setupGRPCGateway(
serverTLS, clientTLS *tlscfg.Options,
tenancyOptions tenancy.Options,
) *testGateway {
if useHTTPGateway {
return setupHTTPGateway(t, basePath, serverTLS, clientTLS, tenancyOptions)
}
gw := &testGateway{
reader: &spanstoremocks.Reader{},
}
Expand Down Expand Up @@ -159,11 +164,12 @@ func (gw *testGateway) execRequest(t *testing.T, gwReq *gatewayRequest) ([]byte,
func verifySnapshot(t *testing.T, body []byte) []byte {
// reformat JSON body with indentation, to make diffing easier
var data interface{}
require.NoError(t, json.Unmarshal(body, &data))
require.NoError(t, json.Unmarshal(body, &data), "response: %s", string(body))
body, err := json.MarshalIndent(data, "", " ")
require.NoError(t, err)

snapshotFile := filepath.Join(snapshotLocation, strings.ReplaceAll(t.Name(), "/", "_")+".json")
testName := path.Base(t.Name())
snapshotFile := filepath.Join(snapshotLocation, testName+".json")
if regenerateSnapshots {
os.WriteFile(snapshotFile, body, 0o644)
}
Expand All @@ -177,17 +183,6 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) {
require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj))
}

func parseChunkResponse(t *testing.T, body []byte, obj gogoproto.Message) {
// Unwrap the 'result' container generated by the gateway.
// See https://github.com/grpc-ecosystem/grpc-gateway/issues/2189
type resultWrapper struct {
Result json.RawMessage `json:"result"`
}
var result resultWrapper
require.NoError(t, json.Unmarshal(body, &result))
parseResponse(t, result.Result, obj)
}

func makeTestTrace() (*model.Trace, model.TraceID) {
traceID := model.NewTraceID(150, 160)
return &model.Trace{
Expand Down Expand Up @@ -282,36 +277,35 @@ func runGatewayGetTrace(t *testing.T, gw *testGateway, setupRequest func(*http.R
require.Equal(t, http.StatusOK, statusCode, "response=%s", string(body))
body = verifySnapshot(t, body)

var spansResponse api_v3.SpansResponseChunk
parseChunkResponse(t, body, &spansResponse)
var response api_v3.GRPCGatewayWrapper
parseResponse(t, body, &response)

assert.Len(t, spansResponse.GetResourceSpans(), 1)
assert.Equal(t, bytesOfTraceID(t, traceID.High, traceID.Low), spansResponse.GetResourceSpans()[0].GetScopeSpans()[0].GetSpans()[0].GetTraceId())
assert.Len(t, response.Result.ResourceSpans, 1)
assert.Equal(t,
bytesOfTraceID(t, traceID.High, traceID.Low),
response.Result.ResourceSpans[0].ScopeSpans[0].Spans[0].TraceId)
}

func runGatewayFindTraces(t *testing.T, gw *testGateway, setupRequest func(*http.Request)) {
trace, traceID := makeTestTrace()
q, qp := mockFindQueries()
gw.reader.
On("FindTraces", matchContext, mock.AnythingOfType("*spanstore.TraceQueryParameters")).
On("FindTraces", matchContext, qp).
Return([]*model.Trace{trace}, nil).Once()

q := url.Values{}
q.Set("query.service_name", "foobar")
q.Set("query.start_time_min", time.Now().Format(time.RFC3339))
q.Set("query.start_time_max", time.Now().Format(time.RFC3339))

body, statusCode := gw.execRequest(t, &gatewayRequest{
url: "/api/v3/traces?" + q.Encode(),
setupRequest: setupRequest,
})
require.Equal(t, http.StatusOK, statusCode, "response=%s", string(body))
body = verifySnapshot(t, body)

var spansResponse api_v3.SpansResponseChunk
parseChunkResponse(t, body, &spansResponse)
var response api_v3.GRPCGatewayWrapper
parseResponse(t, body, &response)

assert.Len(t, spansResponse.GetResourceSpans(), 1)
assert.Equal(t, bytesOfTraceID(t, traceID.High, traceID.Low), spansResponse.GetResourceSpans()[0].GetScopeSpans()[0].GetSpans()[0].GetTraceId())
assert.Len(t, response.Result.ResourceSpans, 1)
assert.Equal(t,
bytesOfTraceID(t, traceID.High, traceID.Low),
response.Result.ResourceSpans[0].ScopeSpans[0].Spans[0].TraceId)
}

func bytesOfTraceID(t *testing.T, high, low uint64) []byte {
Expand Down Expand Up @@ -382,8 +376,10 @@ func TestGRPCGatewayTenancyRejection(t *testing.T) {
// We don't set tenant header
response, err := http.DefaultClient.Do(req)
require.NoError(t, err)
body, err := io.ReadAll(response.Body)
require.NoError(t, err)
require.NoError(t, response.Body.Close())
require.Equal(t, http.StatusForbidden, response.StatusCode)
require.Equal(t, http.StatusUnauthorized, response.StatusCode, "response=%s", string(body))

// Try again with tenant header set
tm := tenancy.NewManager(&tenancyOptions)
Expand Down
Loading

0 comments on commit 718f666

Please sign in to comment.