Skip to content

Update GRPC #6808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/edsrzf/mmap-go v1.2.0 // indirect
github.com/efficientgo/tools/extkingpin v0.0.0-20230505153745-6b7392939a60 // indirect
github.com/envoyproxy/go-control-plane v0.12.0 // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
Expand Down Expand Up @@ -300,9 +299,6 @@ replace github.com/google/gnostic => github.com/googleapis/gnostic v0.6.9
// https://github.com/thanos-io/thanos/blob/fdeea3917591fc363a329cbe23af37c6fff0b5f0/go.mod#L265
replace gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497

// gRPC 1.66 introduced memory pooling which breaks Cortex queries. Pin 1.65.0 until we have a fix.
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0

replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97

replace github.com/prometheus/prometheus => github.com/prometheus/prometheus v0.302.1
Expand Down
1,167 changes: 66 additions & 1,101 deletions go.sum

Large diffs are not rendered by default.

288 changes: 288 additions & 0 deletions integration/grpc_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
//go:build requires_docker
// +build requires_docker

package integration

import (
"context"
"flag"
"fmt"
"io"
"math/rand"
"net"
"strconv"
"sync"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/server"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor/distributorpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

type mockGprcServer struct {
ingester_client.IngesterServer
}

func (m mockGprcServer) QueryStream(req *ingester_client.QueryRequest, streamServer ingester_client.Ingester_QueryStreamServer) error {
md, _ := metadata.FromIncomingContext(streamServer.Context())
i, _ := strconv.Atoi(md["i"][0])
return streamServer.Send(createStreamResponse(i))
}

func (m mockGprcServer) PushStream(srv ingester_client.Ingester_PushStreamServer) error {
for {
req, err := srv.Recv()
if err == io.EOF {
return nil
}
ctx := metadata.NewIncomingContext(srv.Context(), metadata.MD{"i": []string{req.TenantID}})
res, err := m.Push(ctx, req.Request)
req.Free()
if err != nil {
return err
}
err = srv.Send(res)
if err != nil {
return err
}
}
}

func (m mockGprcServer) Push(ctx context.Context, request *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
defer request.Free()
time.Sleep(time.Duration(rand.Int31n(100)) * time.Millisecond)
md, _ := metadata.FromIncomingContext(ctx)
i, _ := strconv.Atoi(md["i"][0])
expected := createRequest(i)
// Need to do this so the .String method return the same value for MessageWithBufRef
expected.MessageWithBufRef = request.MessageWithBufRef

if expected.String() != request.String() {
return nil, fmt.Errorf("expected %v, got %v", expected, request)
}
return &cortexpb.WriteResponse{}, nil
}

func run(t *testing.T, cfg server.Config, register func(s *grpc.Server), validate func(t *testing.T, con *grpc.ClientConn)) {
savedRegistry := prometheus.DefaultRegisterer
prometheus.DefaultRegisterer = prometheus.NewRegistry()
defer func() {
prometheus.DefaultRegisterer = savedRegistry
}()

grpcPort, closeGrpcPort, err := getLocalHostPort()
require.NoError(t, err)
httpPort, closeHTTPPort, err := getLocalHostPort()
require.NoError(t, err)

err = closeGrpcPort()
require.NoError(t, err)
err = closeHTTPPort()
require.NoError(t, err)

cfg.HTTPListenPort = httpPort
cfg.GRPCListenPort = grpcPort

serv, err := server.New(cfg)
require.NoError(t, err)
register(serv.GRPC)

go func() {
err := serv.Run()
require.NoError(t, err)
}()

defer serv.Shutdown()

grpcHost := fmt.Sprintf("localhost:%d", grpcPort)

clientConfig := grpcclient.Config{}
clientConfig.RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))

dialOptions, err := clientConfig.DialOption(nil, nil)
assert.NoError(t, err)
dialOptions = append([]grpc.DialOption{grpc.WithDefaultCallOptions(clientConfig.CallOptions()...)}, dialOptions...)

conn, err := grpc.NewClient(grpcHost, dialOptions...)
assert.NoError(t, err)
validate(t, conn)
}

func TestConcurrentGrpcCalls(t *testing.T) {
cfg := server.Config{}
(&cfg).RegisterFlags(flag.NewFlagSet("fake", flag.ContinueOnError))

tc := map[string]struct {
cfg server.Config
register func(s *grpc.Server)
validate func(t *testing.T, con *grpc.ClientConn)
}{
"distributor": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
distributorpb.RegisterDistributorServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
client := distributorpb.NewDistributorClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
ctx := context.Background()
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
_, err := client.Push(ctx, createRequest(i))
require.NoError(t, err)
}(i)
}

wg.Wait()
},
},
"distributor push stream": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
ingester_client.RegisterIngesterServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
ctx := context.Background()
client := ingester_client.NewIngesterClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
stream, err := client.PushStream(ctx)
require.NoError(t, err)

ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
err = stream.Send(&cortexpb.StreamWriteRequest{TenantID: strconv.Itoa(i), Request: createRequest(i)})
require.NoError(t, err)
_, err = stream.Recv()
require.NoError(t, err)
//err = stream.Send(&cortexpb.StreamWriteRequest{"i", createRequest(i + 1)})
//require.NoError(t, err)
require.NoError(t, stream.CloseSend())
}(i)
}

wg.Wait()
},
},
"ingester": {
cfg: cfg,
register: func(s *grpc.Server) {
d := &mockGprcServer{}
ingester_client.RegisterIngesterServer(s, d)
},
validate: func(t *testing.T, conn *grpc.ClientConn) {
client := ingester_client.NewIngesterClient(conn)
wg := sync.WaitGroup{}
n := 10000
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
ctx := context.Background()
ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"i": []string{strconv.Itoa(i)}})
s, err := client.QueryStream(ctx, &ingester_client.QueryRequest{})
require.NoError(t, err)
resp, err := s.Recv()
require.NoError(t, err)
expected := createStreamResponse(i)
require.Equal(t, expected.String(), resp.String())
}(i)
}

wg.Wait()
},
},
}

for name, c := range tc {
t.Run(name, func(t *testing.T) {
run(t, c.cfg, c.register, c.validate)
})
}
}

func createStreamResponse(i int) *ingester_client.QueryStreamResponse {
return &ingester_client.QueryStreamResponse{Chunkseries: []ingester_client.TimeSeriesChunk{
{
FromIngesterId: strconv.Itoa(i),
Labels: createLabels(i),
Chunks: []ingester_client.Chunk{
{
StartTimestampMs: int64(i),
EndTimestampMs: int64(i),
Encoding: int32(i),
Data: []byte(strconv.Itoa(i)),
},
},
},
}}
}

func createRequest(i int) *cortexpb.WriteRequest {
labels := createLabels(i)
return &cortexpb.WriteRequest{
Timeseries: []cortexpb.PreallocTimeseries{
{
TimeSeries: &cortexpb.TimeSeries{
Labels: labels,
Samples: []cortexpb.Sample{
{TimestampMs: int64(i), Value: float64(i)},
},
Exemplars: []cortexpb.Exemplar{
{
Labels: labels,
Value: float64(i),
TimestampMs: int64(i),
},
},
},
},
},
}
}

func createLabels(i int) []cortexpb.LabelAdapter {
labels := make([]cortexpb.LabelAdapter, 0, 100)
for j := 0; j < 100; j++ {
labels = append(labels, cortexpb.LabelAdapter{
Name: fmt.Sprintf("test%d_%d", i, j),
Value: fmt.Sprintf("test%d_%d", i, j),
})
}
return labels
}

func getLocalHostPort() (int, func() error, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, nil, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, nil, err
}

closePort := func() error {
return l.Close()
}
return l.Addr().(*net.TCPAddr).Port, closePort, nil
}
Loading