Skip to content

Commit

Permalink
Add optinal time window in TraceGetParameters
Browse files Browse the repository at this point in the history
Signed-off-by: rim99 <zhangxin@outlook.com>
  • Loading branch information
rim99 committed Nov 4, 2024
1 parent 772d84c commit cb5f24f
Show file tree
Hide file tree
Showing 49 changed files with 1,185 additions and 406 deletions.
7 changes: 4 additions & 3 deletions Makefile.Protobuf.mk
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
# instead of the go_package's declared by the imported protof files.
#

DOCKER_PROTOBUF_VERSION=0.5.0
DOCKER_PROTOBUF=jaegertracing/protobuf:$(DOCKER_PROTOBUF_VERSION)
PROTOC := docker run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD}
CONTAINER=docker
CONTAINER_PROTOBUF_VERSION=0.5.0
CONTAINER_PROTOBUF=jaegertracing/protobuf:$(CONTAINER_PROTOBUF_VERSION)
PROTOC := ${CONTAINER} run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${CONTAINER_PROTOBUF} --proto_path=${PWD}

PATCHED_OTEL_PROTO_DIR = proto-gen/.patched-otel-proto

Expand Down
14 changes: 14 additions & 0 deletions cmd/anonymizer/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type Options struct {
HashCustomTags bool
HashLogs bool
HashProcess bool
StartTime int64
EndTime int64
}

const (
Expand All @@ -28,6 +30,8 @@ const (
hashLogsFlag = "hash-logs"
hashProcessFlag = "hash-process"
maxSpansCount = "max-spans-count"
startTime = "start-time"
endTime = "end-time"
)

// AddFlags adds flags for anonymizer main program
Expand Down Expand Up @@ -72,6 +76,16 @@ func (o *Options) AddFlags(command *cobra.Command) {
maxSpansCount,
-1,
"The maximum number of spans to anonymize")
command.Flags().Int64Var(
&o.StartTime,
startTime,
-1,
"The start time of time window for searching trace, timestampe in unix microseconds")
command.Flags().Int64Var(
&o.EndTime,
endTime,
-1,
"The end time of time window for searching trace, timestampe in unix microseconds")

// mark traceid flag as mandatory
command.MarkFlagRequired(traceIDFlag)
Expand Down
19 changes: 16 additions & 3 deletions cmd/anonymizer/app/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"strings"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -49,15 +50,27 @@ func unwrapNotFoundErr(err error) error {
}

// QueryTrace queries for a trace and returns all spans inside it
func (q *Query) QueryTrace(traceID string) ([]model.Span, error) {
func (q *Query) QueryTrace(traceID string, startTime int64, endTime int64) ([]model.Span, error) {
mTraceID, err := model.TraceIDFromString(traceID)
if err != nil {
return nil, fmt.Errorf("failed to convert the provided trace id: %w", err)
}

stream, err := q.client.GetTrace(context.Background(), &api_v2.GetTraceRequest{
request := api_v2.GetTraceRequest{
TraceID: mTraceID,
})
}

if startTime != -1 {
t := time.UnixMicro(startTime)
request.StartTime = &t
}

if endTime != -1 {
t := time.UnixMicro(endTime)
request.EndTime = &t
}

stream, err := q.client.GetTrace(context.Background(), &request)
if err != nil {
return nil, unwrapNotFoundErr(err)
}
Expand Down
22 changes: 15 additions & 7 deletions cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -24,8 +25,8 @@ import (
)

var (
matchContext = mock.AnythingOfType("*context.valueCtx")
matchTraceID = mock.AnythingOfType("model.TraceID")
matchContext = mock.AnythingOfType("*context.valueCtx")
matchTraceGetParameters = mock.AnythingOfType("spanstore.TraceGetParameters")

mockInvalidTraceID = "xyz"
mockTraceID = model.NewTraceID(0, 123456)
Expand Down Expand Up @@ -106,24 +107,31 @@ func TestQueryTrace(t *testing.T) {
defer q.Close()

t.Run("No error", func(t *testing.T) {
s.spanReader.On("GetTrace", matchContext, matchTraceID).Return(
startTime := time.Date(1970, time.January, 1, 0, 0, 0, 1000, time.UTC)
endTime := time.Date(1970, time.January, 1, 0, 0, 0, 2000, time.UTC)
expectedTraceGetParameters := spanstore.TraceGetParameters{
TraceID: mockTraceID,
StartTime: &startTime,
EndTime: &endTime,
}
s.spanReader.On("GetTrace", matchContext, expectedTraceGetParameters).Return(
mockTraceGRPC, nil).Once()

spans, err := q.QueryTrace(mockTraceID.String())
spans, err := q.QueryTrace(mockTraceID.String(), 1, 2)
require.NoError(t, err)
assert.Equal(t, len(spans), len(mockTraceGRPC.Spans))
})

t.Run("Invalid TraceID", func(t *testing.T) {
_, err := q.QueryTrace(mockInvalidTraceID)
_, err := q.QueryTrace(mockInvalidTraceID, -1, -1)
assert.ErrorContains(t, err, "failed to convert the provided trace id")
})

t.Run("Trace not found", func(t *testing.T) {
s.spanReader.On("GetTrace", matchContext, matchTraceID).Return(
s.spanReader.On("GetTrace", matchContext, matchTraceGetParameters).Return(
nil, spanstore.ErrTraceNotFound).Once()

spans, err := q.QueryTrace(mockTraceID.String())
spans, err := q.QueryTrace(mockTraceID.String(), -1, -1)
assert.Nil(t, spans)
assert.ErrorIs(t, err, spanstore.ErrTraceNotFound)
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/anonymizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func main() {
logger.Fatal("error while creating query object", zap.Error(err))
}

spans, err := query.QueryTrace(options.TraceID)
spans, err := query.QueryTrace(options.TraceID, options.StartTime, options.EndTime)
if err != nil {
logger.Fatal("error while querying for trace", zap.Error(err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
factoryMocks "github.com/jaegertracing/jaeger/storage/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type mockStorageExt struct {
Expand Down Expand Up @@ -154,7 +155,7 @@ func TestExporter(t *testing.T) {
spanReader, err := storageFactory.CreateSpanReader()
require.NoError(t, err)
requiredTraceID := model.NewTraceID(0, 1) // 00000000000000000000000000000001
requiredTrace, err := spanReader.GetTrace(ctx, requiredTraceID)
requiredTrace, err := spanReader.GetTrace(ctx, spanstore.TraceGetParameters{TraceID: requiredTraceID})
require.NoError(t, err)
assert.Equal(t, spanID.String(), requiredTrace.Spans[0].SpanID.String())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
spanstoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)
Expand All @@ -32,7 +33,8 @@ func TestWriteTraces(t *testing.T) {
tdID := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
traceID, err := model.TraceIDFromBytes(tdID[:])
require.NoError(t, err)
trace, err := memstore.GetTrace(context.Background(), traceID)
query := spanstore.TraceGetParameters { TraceID: traceID }
trace, err := memstore.GetTrace(context.Background(), query)
require.NoError(t, err)
require.NotNil(t, trace)
assert.Len(t, trace.Spans, 1)
Expand Down
6 changes: 4 additions & 2 deletions cmd/jaeger/internal/integration/span_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ func unwrapNotFoundErr(err error) error {
return err
}

func (r *spanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
func (r *spanReader) GetTrace(ctx context.Context, query spanstore.TraceGetParameters) (*model.Trace, error) {
stream, err := r.client.GetTrace(ctx, &api_v2.GetTraceRequest{
TraceID: traceID,
TraceID: query.TraceID,
StartTime: query.StartTime,
EndTime: query.EndTime,
})
if err != nil {
return nil, unwrapNotFoundErr(err)
Expand Down
15 changes: 8 additions & 7 deletions cmd/query/app/apiv3/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func parseResponse(t *testing.T, body []byte, obj gogoproto.Message) {
require.NoError(t, gogojsonpb.Unmarshal(bytes.NewBuffer(body), obj))
}

func makeTestTrace() (*model.Trace, model.TraceID) {
func makeTestTrace() (*model.Trace, spanstore.TraceGetParameters) {
traceID := model.NewTraceID(150, 160)
query := spanstore.TraceGetParameters{TraceID: traceID}
return &model.Trace{
Spans: []*model.Span{
{
Expand All @@ -94,7 +95,7 @@ func makeTestTrace() (*model.Trace, model.TraceID) {
},
},
},
}, traceID
}, query
}

func runGatewayTests(
Expand Down Expand Up @@ -140,18 +141,18 @@ func (gw *testGateway) runGatewayGetOperations(t *testing.T) {
}

func (gw *testGateway) runGatewayGetTrace(t *testing.T) {
trace, traceID := makeTestTrace()
gw.reader.On("GetTrace", matchContext, traceID).Return(trace, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/"+traceID.String(), traceID)
trace, query := makeTestTrace()
gw.reader.On("GetTrace", matchContext, query).Return(trace, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces/"+query.TraceID.String(), query.TraceID)
}

func (gw *testGateway) runGatewayFindTraces(t *testing.T) {
trace, traceID := makeTestTrace()
trace, query := makeTestTrace()
q, qp := mockFindQueries()
gw.reader.
On("FindTraces", matchContext, qp).
Return([]*model.Trace{trace}, nil).Once()
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), traceID)
gw.getTracesAndVerify(t, "/api/v3/traces?"+q.Encode(), query.TraceID)
}

func (gw *testGateway) getTracesAndVerify(t *testing.T, url string, expectedTraceID model.TraceID) {
Expand Down
7 changes: 6 additions & 1 deletion cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
return fmt.Errorf("malform trace ID: %w", err)
}

trace, err := h.QueryService.GetTrace(stream.Context(), traceID)
query := spanstore.TraceGetParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
}
trace, err := h.QueryService.GetTrace(stream.Context(), query)
if err != nil {
return fmt.Errorf("cannot retrieve trace: %w", err)
}
Expand Down
25 changes: 19 additions & 6 deletions cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net"
"testing"
"time"

"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
Expand All @@ -26,8 +27,8 @@ import (
)

var (
matchContext = mock.AnythingOfType("*context.valueCtx")
matchTraceID = mock.AnythingOfType("model.TraceID")
matchContext = mock.AnythingOfType("*context.valueCtx")
matchTraceGetParameters = mock.AnythingOfType("spanstore.TraceGetParameters")
)

func newGrpcServer(t *testing.T, handler *Handler) (*grpc.Server, net.Addr) {
Expand Down Expand Up @@ -79,7 +80,17 @@ func newTestServerClient(t *testing.T) *testServerClient {

func TestGetTrace(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
traceIdLiteral := "156"
traceId, _ := model.TraceIDFromString(traceIdLiteral)
startTime := time.Date(2020, time.January, 1, 13, 0, 0, 0, time.UTC)
endTime := time.Date(2020, time.January, 1, 14, 0, 0, 0, time.UTC)

expectedTraceGetParameters := spanstore.TraceGetParameters{
TraceID: traceId,
StartTime: &startTime,
EndTime: &endTime,
}
tsc.reader.On("GetTrace", matchContext, expectedTraceGetParameters).Return(
&model.Trace{
Spans: []*model.Span{
{
Expand All @@ -90,7 +101,9 @@ func TestGetTrace(t *testing.T) {

getTraceStream, err := tsc.client.GetTrace(context.Background(),
&api_v3.GetTraceRequest{
TraceId: "156",
TraceId: traceIdLiteral,
StartTime: &startTime,
EndTime: &endTime,
},
)
require.NoError(t, err)
Expand All @@ -104,7 +117,7 @@ func TestGetTrace(t *testing.T) {

func TestGetTraceStorageError(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
tsc.reader.On("GetTrace", matchContext, matchTraceGetParameters).Return(
nil, fmt.Errorf("storage_error")).Once()

getTraceStream, err := tsc.client.GetTrace(context.Background(), &api_v3.GetTraceRequest{
Expand All @@ -119,7 +132,7 @@ func TestGetTraceStorageError(t *testing.T) {

func TestGetTraceTraceIDError(t *testing.T) {
tsc := newTestServerClient(t)
tsc.reader.On("GetTrace", matchContext, matchTraceID).Return(
tsc.reader.On("GetTrace", matchContext, matchTraceGetParameters).Return(
&model.Trace{
Spans: []*model.Span{},
}, nil).Once()
Expand Down
25 changes: 24 additions & 1 deletion cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

const (
paramTraceID = "trace_id" // get trace by ID
paramStartTime = "start_time"
paramEndTime = "end_time"
paramServiceName = "query.service_name" // find traces
paramOperationName = "query.operation_name"
paramTimeMin = "query.start_time_min"
Expand Down Expand Up @@ -136,7 +138,28 @@ func (h *HTTPGateway) getTrace(w http.ResponseWriter, r *http.Request) {
if h.tryParamError(w, err, paramTraceID) {
return
}
trc, err := h.QueryService.GetTrace(r.Context(), traceID)

query := spanstore.TraceGetParameters {
TraceID: traceID,
}
http_query := r.URL.Query()
startTime := http_query.Get(paramStartTime)
if startTime != "" {
timeParsed, err := time.Parse(time.RFC3339Nano, startTime)
if h.tryParamError(w, err, paramStartTime) {
return
}
query.StartTime = &timeParsed
}
endTime := http_query.Get(paramEndTime)
if endTime != "" {
timeParsed, err := time.Parse(time.RFC3339Nano, endTime)
if h.tryParamError(w, err, paramEndTime) {
return
}
query.EndTime = &timeParsed
}
trc, err := h.QueryService.GetTrace(r.Context(), query)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
}
Expand Down
Loading

0 comments on commit cb5f24f

Please sign in to comment.