Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate query-service functionality from http handler #1312

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions cmd/query/app/handler_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import (

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// HandlerOption is a function that sets some option on the APIHandler
Expand All @@ -40,13 +37,6 @@ func (handlerOptions) Logger(logger *zap.Logger) HandlerOption {
}
}

// Adjusters creates a HandlerOption that initializes the sequence of Adjusters on the APIHandler,
func (handlerOptions) Adjusters(adjusters ...adjuster.Adjuster) HandlerOption {
return func(apiHandler *APIHandler) {
apiHandler.adjuster = adjuster.Sequence(adjusters...)
}
}

// BasePath creates a HandlerOption that initializes the base path for all HTTP routes
func (handlerOptions) BasePath(prefix string) HandlerOption {
return func(apiHandler *APIHandler) {
Expand All @@ -68,20 +58,6 @@ func (handlerOptions) QueryLookbackDuration(queryLookbackDuration time.Duration)
}
}

// ArchiveSpanReader creates a HandlerOption that initializes lookback duration
func (handlerOptions) ArchiveSpanReader(reader spanstore.Reader) HandlerOption {
return func(apiHandler *APIHandler) {
apiHandler.archiveSpanReader = reader
}
}

// ArchiveSpanWriter creates a HandlerOption that initializes lookback duration
func (handlerOptions) ArchiveSpanWriter(writer spanstore.Writer) HandlerOption {
return func(apiHandler *APIHandler) {
apiHandler.archiveSpanWriter = writer
}
}

// Tracer creates a HandlerOption that initializes OpenTracing tracer
func (handlerOptions) Tracer(tracer opentracing.Tracer) HandlerOption {
return func(apiHandler *APIHandler) {
Expand Down
155 changes: 52 additions & 103 deletions cmd/query/app/handler.go → cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
uiconv "github.com/jaegertracing/jaeger/model/converter/json"
ui "github.com/jaegertracing/jaeger/model/json"
"github.com/jaegertracing/jaeger/pkg/multierror"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -48,10 +47,6 @@ const (
defaultAPIPrefix = "api"
)

var (
errNoArchiveSpanStorage = errors.New("archive span storage was not configured")
)

// HTTPHandler handles http requests
type HTTPHandler interface {
RegisterRoutes(router *mux.Router)
Expand All @@ -78,23 +73,18 @@ func NewRouter() *mux.Router {

// APIHandler implements the query service public API by registering routes at httpPrefix
type APIHandler struct {
spanReader spanstore.Reader
archiveSpanReader spanstore.Reader
archiveSpanWriter spanstore.Writer
dependencyReader dependencystore.Reader
adjuster adjuster.Adjuster
logger *zap.Logger
queryParser queryParser
basePath string
apiPrefix string
tracer opentracing.Tracer
queryService *querysvc.QueryService
queryParser queryParser
basePath string
apiPrefix string
logger *zap.Logger
tracer opentracing.Tracer
}

// NewAPIHandler returns an APIHandler
func NewAPIHandler(spanReader spanstore.Reader, dependencyReader dependencystore.Reader, options ...HandlerOption) *APIHandler {
func NewAPIHandler(queryService *querysvc.QueryService, options ...HandlerOption) *APIHandler {
aH := &APIHandler{
spanReader: spanReader,
dependencyReader: dependencyReader,
queryService: queryService,
queryParser: queryParser{
traceQueryLookbackDuration: defaultTraceQueryLookbackDuration,
timeNow: time.Now,
Expand All @@ -107,12 +97,6 @@ func NewAPIHandler(spanReader spanstore.Reader, dependencyReader dependencystore
if aH.apiPrefix == "" {
aH.apiPrefix = defaultAPIPrefix
}
if aH.adjuster == nil {
aH.adjuster = adjuster.Sequence(StandardAdjusters...)
}
if aH.logger == nil {
aH.logger = zap.NewNop()
}
if aH.tracer == nil {
aH.tracer = opentracing.NoopTracer{}
}
Expand Down Expand Up @@ -154,7 +138,7 @@ func (aH *APIHandler) route(route string, args ...interface{}) string {
}

func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) {
services, err := aH.spanReader.GetServices(r.Context())
services, err := aH.queryService.GetServices(r.Context())
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -169,7 +153,7 @@ func (aH *APIHandler) getOperationsLegacy(w http.ResponseWriter, r *http.Request
vars := mux.Vars(r)
// given how getOperationsLegacy is bound to URL route, serviceParam cannot be empty
service, _ := url.QueryUnescape(vars[serviceParam])
operations, err := aH.spanReader.GetOperations(r.Context(), service)
operations, err := aH.queryService.GetOperations(r.Context(), service)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -187,7 +171,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
return
}
}
operations, err := aH.spanReader.GetOperations(r.Context(), service)
operations, err := aH.queryService.GetOperations(r.Context(), service)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -212,7 +196,7 @@ func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) {
return
}
} else {
tracesFromStorage, err = aH.spanReader.FindTraces(r.Context(), &tQuery.TraceQueryParameters)
tracesFromStorage, err = aH.queryService.FindTraces(r.Context(), &tQuery.TraceQueryParameters)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -238,7 +222,7 @@ func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID)
var errors []structuredError
retMe := make([]*model.Trace, 0, len(traceIDs))
for _, traceID := range traceIDs {
if trace, err := trace(ctx, traceID, aH.spanReader, aH.archiveSpanReader); err != nil {
if trace, err := aH.queryService.GetTrace(ctx, traceID); err != nil {
if err != spanstore.ErrTraceNotFound {
return nil, nil, err
}
Expand Down Expand Up @@ -272,7 +256,7 @@ func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
}
endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond)

dependencies, err := aH.dependencyReader.GetDependencies(endTs, lookback)
dependencies, err := aH.queryService.GetDependencies(endTs, lookback)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -288,7 +272,7 @@ func (aH *APIHandler) convertModelToUI(trace *model.Trace, adjust bool) (*ui.Tra
var errors []error
if adjust {
var err error
trace, err = aH.adjuster.Adjust(trace)
trace, err = aH.queryService.Adjust(trace)
if err != nil {
errors = append(errors, err)
}
Expand Down Expand Up @@ -353,7 +337,7 @@ func (aH *APIHandler) parseTraceID(w http.ResponseWriter, r *http.Request) (mode

// getTrace implements the REST API /traces/{trace-id}
func (aH *APIHandler) getTrace(w http.ResponseWriter, r *http.Request) {
aH.getTraceFromReaders(w, r, aH.spanReader, aH.archiveSpanReader)
aH.getTraceFromReaders(w, r, aH.queryService)
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
}

// getTraceFromReader parses trace ID from the path, loads the trace from specified Reader,
Expand All @@ -362,96 +346,61 @@ func (aH *APIHandler) getTraceFromReaders(
w http.ResponseWriter,
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
r *http.Request,
reader spanstore.Reader,
backupReader spanstore.Reader,
) {
aH.withTraceFromReader(w, r, reader, backupReader, func(trace *model.Trace) {
var uiErrors []structuredError
uiTrace, uiErr := aH.convertModelToUI(trace, shouldAdjust(r))
if uiErr != nil {
uiErrors = append(uiErrors, *uiErr)
}

structuredRes := structuredResponse{
Data: []*ui.Trace{
uiTrace,
},
Errors: uiErrors,
}
aH.writeJSON(w, r, &structuredRes)
})
}

func shouldAdjust(r *http.Request) bool {
raw := r.FormValue("raw")
isRaw, _ := strconv.ParseBool(raw)
return !isRaw
}

// withTraceFromReader tries to load a trace from Reader and if successful
// execute process() function passing it that trace.
func (aH *APIHandler) withTraceFromReader(
w http.ResponseWriter,
r *http.Request,
reader spanstore.Reader,
backupReader spanstore.Reader,
process func(trace *model.Trace),
) {
traceID, ok := aH.parseTraceID(w, r)
if !ok {
return
}
trace, err := trace(r.Context(), traceID, reader, backupReader)
trace, err := reader.GetTrace(r.Context(), traceID)
if err == spanstore.ErrTraceNotFound {
aH.handleError(w, err, http.StatusNotFound)
return
}
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
process(trace)
}

func trace(
ctx context.Context,
traceID model.TraceID,
reader spanstore.Reader,
backupReader spanstore.Reader,
) (*model.Trace, error) {
trace, err := reader.GetTrace(ctx, traceID)
if err == spanstore.ErrTraceNotFound {
if backupReader == nil {
return nil, err
}
trace, err = backupReader.GetTrace(ctx, traceID)
var uiErrors []structuredError
uiTrace, uiErr := aH.convertModelToUI(trace, shouldAdjust(r))
if uiErr != nil {
uiErrors = append(uiErrors, *uiErr)
}
return trace, err

structuredRes := structuredResponse{
Data: []*ui.Trace{
uiTrace,
},
Errors: uiErrors,
}
aH.writeJSON(w, r, &structuredRes)
}

func shouldAdjust(r *http.Request) bool {
raw := r.FormValue("raw")
isRaw, _ := strconv.ParseBool(raw)
return !isRaw
}

// archiveTrace implements the REST API POST:/archive/{trace-id}.
// It reads the trace from the main Reader and saves it to archive Writer.
// It passes the traceID to queryService.ArchiveTrace for writing.
func (aH *APIHandler) archiveTrace(w http.ResponseWriter, r *http.Request) {
if aH.archiveSpanWriter == nil {
aH.handleError(w, errNoArchiveSpanStorage, http.StatusInternalServerError)
traceID, ok := aH.parseTraceID(w, r)
if !ok {
return
}
aH.withTraceFromReader(w, r, aH.spanReader, nil, func(trace *model.Trace) {
var writeErrors []error
for _, span := range trace.Spans {
err := aH.archiveSpanWriter.WriteSpan(span)
if err != nil {
writeErrors = append(writeErrors, err)
}
}
err := multierror.Wrap(writeErrors)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
structuredRes := structuredResponse{
Data: []string{}, // doens't matter, just want an empty array
Errors: []structuredError{},
}
aH.writeJSON(w, r, &structuredRes)
})

// QueryService.ArchiveTrace can now archive this traceID.
err := aH.queryService.ArchiveTrace(r.Context(), &traceID)
if err != nil {
aH.handleError(w, err, http.StatusInternalServerError)
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
return
}

structuredRes := structuredResponse{
Data: []string{}, // doens't matter, just want an empty array
Errors: []structuredError{},
}
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode int) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
ui "github.com/jaegertracing/jaeger/model/json"
Expand Down Expand Up @@ -294,7 +295,7 @@ func TestGetTraceNotFound(t *testing.T) {

func TestGetTraceAdjustmentFailure(t *testing.T) {
server, readMock, _, _ := initializeTestServerWithHandler(
HandlerOptions.Adjusters(
querysvc.QueryServiceOptions.Adjusters(
adjuster.Func(func(trace *model.Trace) (*model.Trace, error) {
return trace, errAdjustment
}),
Expand Down Expand Up @@ -347,7 +348,7 @@ func TestSearchByTraceIDSuccess(t *testing.T) {

func TestSearchByTraceIDSuccessWithArchive(t *testing.T) {
archiveReadMock := &spanstoremocks.Reader{}
server, readMock, _ := initializeTestServer(HandlerOptions.ArchiveSpanReader(archiveReadMock))
server, readMock, _ := initializeTestServer(querysvc.QueryServiceOptions.ArchiveSpanReader(archiveReadMock))
defer server.Close()
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Twice()
Expand Down Expand Up @@ -388,7 +389,7 @@ func TestSearchByTraceIDFailure(t *testing.T) {

func TestSearchModelConversionFailure(t *testing.T) {
server, readMock, _, _ := initializeTestServerWithOptions(
HandlerOptions.Adjusters(
querysvc.QueryServiceOptions.Adjusters(
adjuster.Func(func(trace *model.Trace) (*model.Trace, error) {
return trace, errAdjustment
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package app
package querysvc

import (
"github.com/jaegertracing/jaeger/model/adjuster"
Expand Down
Loading