Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Thrift-gen with Proto-gen types for sampling strategies #4181

Merged
merged 2 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import (
"github.com/jaegertracing/jaeger/internal/metrics/fork"
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

Expand Down Expand Up @@ -203,7 +203,7 @@ func (fakeCollectorProxy) Close() error {
return nil
}

func (f fakeCollectorProxy) GetSamplingStrategy(_ context.Context, _ string) (*sampling.SamplingStrategyResponse, error) {
func (f fakeCollectorProxy) GetSamplingStrategy(_ context.Context, _ string) (*api_v2.SamplingStrategyResponse, error) {
return nil, errors.New("no peers available")
}

Expand Down
20 changes: 7 additions & 13 deletions cmd/agent/app/configmanager/grpc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,28 @@ import (

"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// SamplingManager returns sampling decisions from collector over gRPC.
type SamplingManager struct {
// ConfigManagerProxy returns sampling decisions from collector over gRPC.
type ConfigManagerProxy struct {
client api_v2.SamplingManagerClient
}

// NewConfigManager creates gRPC sampling manager.
func NewConfigManager(conn *grpc.ClientConn) *SamplingManager {
return &SamplingManager{
func NewConfigManager(conn *grpc.ClientConn) *ConfigManagerProxy {
return &ConfigManagerProxy{
client: api_v2.NewSamplingManagerClient(conn),
}
}

// GetSamplingStrategy returns sampling strategies from collector.
func (s *SamplingManager) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
r, err := s.client.GetSamplingStrategy(ctx, &api_v2.SamplingStrategyParameters{ServiceName: serviceName})
if err != nil {
return nil, err
}
return jaeger.ConvertSamplingResponseFromDomain(r)
func (s *ConfigManagerProxy) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
return s.client.GetSamplingStrategy(ctx, &api_v2.SamplingStrategyParameters{ServiceName: serviceName})
}

// GetBaggageRestrictions returns baggage restrictions from collector.
func (s *SamplingManager) GetBaggageRestrictions(_ context.Context, _ string) ([]*baggage.BaggageRestriction, error) {
func (s *ConfigManagerProxy) GetBaggageRestrictions(_ context.Context, _ string) ([]*baggage.BaggageRestriction, error) {
return nil, errors.New("baggage not implemented")
}
3 changes: 1 addition & 2 deletions cmd/agent/app/configmanager/grpc/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

func close(t *testing.T, c io.Closer) {
Expand All @@ -44,7 +43,7 @@ func TestSamplingManager_GetSamplingStrategy(t *testing.T) {
manager := NewConfigManager(conn)
resp, err := manager.GetSamplingStrategy(context.Background(), "any")
require.NoError(t, err)
assert.Equal(t, &sampling.SamplingStrategyResponse{StrategyType: sampling.SamplingStrategyType_PROBABILISTIC}, resp)
assert.Equal(t, &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, resp)
}

func TestSamplingManager_GetSamplingStrategy_error(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions cmd/agent/app/configmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package configmanager
import (
"context"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// TODO this interface could be moved to pkg/clientcfg, along with grpc proxy,
// but not the metrics wrapper (because its metric names are specific to agent).

// ClientConfigManager decides:
// 1) which sampling strategy a given service should be using
// 2) which baggage restrictions a given service should be using.
type ClientConfigManager interface {
GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error)
GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error)
GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error)
}
4 changes: 2 additions & 2 deletions cmd/agent/app/configmanager/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"context"

"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

// configManagerMetrics holds metrics related to ClientConfigManager
Expand Down Expand Up @@ -51,7 +51,7 @@ func WrapWithMetrics(manager ClientConfigManager, mFactory metrics.Factory) *Man
}

// GetSamplingStrategy returns sampling strategy from server.
func (m *ManagerWithMetrics) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
func (m *ManagerWithMetrics) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
r, err := m.wrapped.GetSamplingStrategy(ctx, serviceName)
if err != nil {
m.metrics.SamplingFailures.Inc(1)
Expand Down
6 changes: 3 additions & 3 deletions cmd/agent/app/configmanager/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import (
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

type noopManager struct{}

func (noopManager) GetSamplingStrategy(_ context.Context, s string) (*sampling.SamplingStrategyResponse, error) {
func (noopManager) GetSamplingStrategy(_ context.Context, s string) (*api_v2.SamplingStrategyResponse, error) {
if s == "failed" {
return nil, errors.New("failed")
}
return &sampling.SamplingStrategyResponse{StrategyType: sampling.SamplingStrategyType_PROBABILISTIC}, nil
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func (noopManager) GetBaggageRestrictions(_ context.Context, s string) ([]*baggage.BaggageRestriction, error) {
Expand Down
64 changes: 29 additions & 35 deletions cmd/all-in-one/all_in_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ui "github.com/jaegertracing/jaeger/model/json"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/proto-gen/api_v3"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

const (
Expand All @@ -57,17 +58,13 @@ var httpClient = &http.Client{

func TestAllInOne(t *testing.T) {
// Check if the query service is available
if err := healthCheck(); err != nil {
t.Fatal(err)
}
// Check if the favicon icon is available
if err := faviconCheck(); err != nil {
t.Fatal(err)
}
createTrace(t)
getAPITrace(t)
getSamplingStrategy(t)
getServicesAPIV3(t)
healthCheck(t)

t.Run("Check if the favicon icon is available", faviconCheck)
t.Run("createTrace", createTrace)
t.Run("getAPITrace", getAPITrace)
t.Run("getSamplingStrategy", getSamplingStrategy)
t.Run("getServicesAPIV3", getServicesAPIV3)
}

func createTrace(t *testing.T) {
Expand Down Expand Up @@ -113,43 +110,40 @@ func getSamplingStrategy(t *testing.T) {
req, err := http.NewRequest("GET", getSamplingStrategyURL, nil)
require.NoError(t, err)

var queryResponse sampling.SamplingStrategyResponse
resp, err := httpClient.Do(req)
require.NoError(t, err)

body, _ := io.ReadAll(resp.Body)

err = json.Unmarshal(body, &queryResponse)
var queryResponse api_v2.SamplingStrategyResponse
err = jsonpb.Unmarshal(bytes.NewReader(body), &queryResponse)
require.NoError(t, err)
resp.Body.Close()

assert.NotNil(t, queryResponse.ProbabilisticSampling)
assert.EqualValues(t, 1.0, queryResponse.ProbabilisticSampling.SamplingRate)
}

func healthCheck() error {
println("Health-checking all-in-one...")
for i := 0; i < 10; i++ {
if _, err := http.Get(queryURL); err == nil {
println("Health-check successful")
return nil
}
println("Health-check unsuccessful, waiting 1sec...")
time.Sleep(time.Second)
}
return fmt.Errorf("query service is not ready")
func healthCheck(t *testing.T) {
t.Log("Health-checking all-in-one...")
require.Eventuallyf(
t,
func() bool {
_, err := http.Get(queryURL)
return err == nil
},
10*time.Second,
time.Second,
"expecting query endpoint to be healhty",
)
}

func faviconCheck() error {
println("Checking favicon...")
func faviconCheck(t *testing.T) {
t.Log("Checking favicon...")
resp, err := http.Get(queryURL + "/favicon.ico")
if err == nil && resp.StatusCode == http.StatusOK {
println("Favicon check successful")
return nil
} else {
println("Favicon check failed")
return fmt.Errorf("all-in-one failed to serve favicon icon")
}
require.NoError(t, err)
require.NotNil(t, resp)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}

func getServicesAPIV3(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

var _ (io.Closer) = (*Collector)(nil)
Expand Down Expand Up @@ -122,8 +122,8 @@ func TestCollector_StartErrors(t *testing.T) {

type mockStrategyStore struct{}

func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
return &sampling.SamplingStrategyResponse{}, nil
func (m *mockStrategyStore) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{}, nil
}

func TestCollector_PublishOpts(t *testing.T) {
Expand Down
7 changes: 1 addition & 6 deletions cmd/collector/app/sampling/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand All @@ -36,9 +35,5 @@ func NewGRPCHandler(store strategystore.StrategyStore) GRPCHandler {

// GetSamplingStrategy returns sampling decision from store.
func (s GRPCHandler) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
r, err := s.store.GetSamplingStrategy(ctx, param.GetServiceName())
if err != nil {
return nil, err
}
return jaeger.ConvertSamplingResponseToDomain(r)
return s.store.GetSamplingStrategy(ctx, param.GetServiceName())
}
5 changes: 2 additions & 3 deletions cmd/collector/app/sampling/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ import (
"golang.org/x/net/context"

"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

type mockSamplingStore struct{}

func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
func (s mockSamplingStore) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
if serviceName == "error" {
return nil, errors.New("some error")
} else if serviceName == "nil" {
return nil, nil
}
return &sampling.SamplingStrategyResponse{StrategyType: sampling.SamplingStrategyType_PROBABILISTIC}, nil
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func TestNewGRPCHandler(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"context"
"io"

"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// StrategyStore keeps track of service specific sampling strategies.
type StrategyStore interface {
// GetSamplingStrategy retrieves the sampling strategy for the specified service.
GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error)
GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error)
}

// Aggregator defines an interface used to aggregate operation throughput.
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

type mockSamplingStore struct{}

func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
func (s mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) {
return nil, nil
}

Expand Down
12 changes: 6 additions & 6 deletions crossdock/services/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
package services

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/thrift-gen/sampling"
p2json "github.com/jaegertracing/jaeger/model/converter/json"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

var errSamplingRateMissing = errors.New("sampling rate is missing")
Expand Down Expand Up @@ -65,14 +65,14 @@ func (s *agentService) GetSamplingRate(service, operation string) (float64, erro
}
s.logger.Info("Retrieved sampling rates from agent", zap.String("body", string(body)))

var response sampling.SamplingStrategyResponse
if err = json.Unmarshal(body, &response); err != nil {
response, err := p2json.SamplingStrategyResponseFromJSON(body)
if err != nil {
return 0, err
}
return getSamplingRate(operation, &response)
return getSamplingRate(operation, response)
}

func getSamplingRate(operation string, response *sampling.SamplingStrategyResponse) (float64, error) {
func getSamplingRate(operation string, response *api_v2.SamplingStrategyResponse) (float64, error) {
if response.OperationSampling == nil {
return 0, errSamplingRateMissing
}
Expand Down
Loading