Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmark: add a feature for the stream count #5898

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
readRespSizeBytes = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
reqPayloadCurveFiles = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
streamCounts = flags.IntSlice("streamCounts", nil, "Stream counts in a single stream - may be a comma-separated list")
benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
Expand Down Expand Up @@ -151,6 +152,7 @@ var (
defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
defaultReqSizeBytes = []int{1, 1024, 1024 * 1024}
defaultRespSizeBytes = []int{1, 1024, 1024 * 1024}
defaultStreamCounts = []int{1, 8}
networks = map[string]latency.Network{
networkModeLocal: latency.Local,
networkModeLAN: latency.LAN,
Expand Down Expand Up @@ -384,13 +386,14 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
return func(pos int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
streamCount := bf.StreamCount
if bf.ReqPayloadCurve != nil {
reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
}
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
streamCaller(streams[pos], reqSizeBytes, respSizeBytes, streamCount)
}, cleanup
}

Expand Down Expand Up @@ -455,8 +458,8 @@ func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int)
}
}

func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize, streamCount int) {
if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize, streamCount); err != nil {
logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
}
}
Expand Down Expand Up @@ -522,6 +525,7 @@ type featureOpts struct {
respSizeBytes []int
reqPayloadCurves []*stats.PayloadCurve
respPayloadCurves []*stats.PayloadCurve
streamCounts []int
compModes []string
enableChannelz []bool
enablePreloader []bool
Expand Down Expand Up @@ -559,6 +563,8 @@ func makeFeaturesNum(b *benchOpts) []int {
featuresNum[i] = len(b.features.reqPayloadCurves)
case stats.RespPayloadCurveIndex:
featuresNum[i] = len(b.features.respPayloadCurves)
case stats.StreamCountsIndex:
featuresNum[i] = len(b.features.streamCounts)
case stats.CompModesIndex:
featuresNum[i] = len(b.features.compModes)
case stats.EnableChannelzIndex:
Expand Down Expand Up @@ -632,6 +638,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]],
MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]],
MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
StreamCount: b.features.streamCounts[curPos[stats.StreamCountsIndex]],
ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]],
EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
Expand Down Expand Up @@ -702,6 +709,7 @@ func processFlags() *benchOpts {
maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
reqSizeBytes: append([]int(nil), *readReqSizeBytes...),
respSizeBytes: append([]int(nil), *readRespSizeBytes...),
streamCounts: append([]int(nil), *streamCounts...),
compModes: setCompressorMode(*compressorMode),
enableChannelz: setToggleMode(*channelzOn),
enablePreloader: setToggleMode(*preloaderMode),
Expand Down Expand Up @@ -746,6 +754,9 @@ func processFlags() *benchOpts {
}
opts.features.respSizeBytes = nil
}
if len(opts.features.streamCounts) == 0 {
opts.features.streamCounts = defaultStreamCounts
}

// Re-write latency, kpbs and mtu if network mode is set.
if network, ok := networks[opts.networkMode]; ok {
Expand Down
59 changes: 40 additions & 19 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"io"
"log"
"net"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -232,40 +233,60 @@ func DoUnaryCall(tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) erro
}

// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize, streamCount int) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think I missed this in my previous iteration. The way this is implemented currently, it creates a single stream and performs multiple sends and receives on the same stream. What I believe you are trying to add is to actually create multiple streams. In that case, we would have to change the signature of DoStreamingRoundTrip() to accept a grpc.ClientConnInterface instead of a testgrpc.BenchmarkService_StreamingCallClient and create streamCount number of streams inside this function (from goroutines), and perform sends and receives on them.

Also, the Send() and Receive() can be inside the same goroutine instead of the latter being outside the goroutine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think I missed this in my previous iteration. The way this is implemented currently, it creates a single stream and performs multiple sends and receives on the same stream. What I believe you are trying to add is to actually create multiple streams. In that case, we would have to change the signature of DoStreamingRoundTrip() to accept a grpc.ClientConnInterface instead of a testgrpc.BenchmarkService_StreamingCallClient and create streamCount number of streams inside this function (from goroutines), and perform sends and receives on them.

Also, the Send() and Receive() can be inside the same goroutine instead of the latter being outside the goroutine.

I would like to create a single stream and perform multiple sends and receives. The current implementation is sufficient for my needs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have three modes of operation for the benchmark client, controlled by the -workloads flag, and in each of these modes, -maxConcurrentCalls number of streams are created upfront

  • Unary: One goroutine per stream is created, and one RPC is done at a time
  • Streaming: One goroutine per stream is created, and one Send() and Recv() is done at a time
  • Unconstrained: Two goroutines per stream are created, one for sending and one for receiving, and two goroutines send and recv independent of the other

Is the unconstrained mode any different from what you are trying to do here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the unconstrained mode any different from what you are trying to do here?

It's almost same. the unconstrained mode can accomplish what I am attempting to do here.

pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(respSize),
Payload: pl,
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
}
if _, err := stream.Recv(); err != nil {
// EOF is a valid error here.
if err == io.EOF {
return nil
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < streamCount; i++ {
if err := stream.Send(req); err != nil {
log.Fatalf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
}
}
}()
for i := 0; i < streamCount; i++ {
if _, err := stream.Recv(); err != nil {
// EOF is a valid error here.
if err == io.EOF {
return nil
}
return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
}
return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
}
wg.Wait()
return nil
}

// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
func DoByteBufStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
out := make([]byte, reqSize)
if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
}
func DoByteBufStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize, streamCount int) error {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
out := make([]byte, reqSize)
for i := 0; i < streamCount; i++ {
if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
log.Fatalf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
}
}
}()
var in []byte
if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
// EOF is a valid error here.
if err == io.EOF {
return nil
for i := 0; i < streamCount; i++ {
if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
// EOF is a valid error here.
if err == io.EOF {
return nil
}
return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
}
return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
}
wg.Wait()
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions benchmark/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
RespSizeBytesIndex
ReqPayloadCurveIndex
RespPayloadCurveIndex
StreamCountsIndex
CompModesIndex
EnableChannelzIndex
EnablePreloaderIndex
Expand Down Expand Up @@ -107,6 +108,8 @@ type Features struct {
// RespPayloadCurve is a histogram representing the shape a random
// distribution request payloads should take.
RespPayloadCurve *PayloadCurve
// StreamCount is the number request and response in a single stream.
StreamCount int
// ModeCompressor represents the compressor mode used.
ModeCompressor string
// EnableChannelz indicates if channelz was turned on.
Expand Down
8 changes: 4 additions & 4 deletions benchmark/worker/benchmark_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc
bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize)
// TODO open loop.
case testpb.RpcType_STREAMING:
bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, 1, payloadType)
// TODO open loop.
default:
return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
Expand Down Expand Up @@ -289,8 +289,8 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
}
}

func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) {
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, streamCount int, payloadType string) {
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int, int) error
if payloadType == "bytebuf" {
doRPC = benchmark.DoByteBufStreamingRoundTrip
} else {
Expand All @@ -315,7 +315,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
// before starting benchmark.
for {
start := time.Now()
if err := doRPC(stream, reqSize, respSize); err != nil {
if err := doRPC(stream, reqSize, respSize, streamCount); err != nil {
return
}
elapse := time.Since(start)
Expand Down