Skip to content

Commit a9c7942

Browse files
authored
benchmark: Add support for Poisson load in benchmark client (#6378)
1 parent dd350d0 commit a9c7942

File tree

2 files changed

+105
-45
lines changed

2 files changed

+105
-45
lines changed

benchmark/worker/benchmark_client.go

Lines changed: 98 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"google.golang.org/grpc/codes"
3333
"google.golang.org/grpc/credentials"
3434
"google.golang.org/grpc/credentials/insecure"
35+
"google.golang.org/grpc/internal/grpcrand"
3536
"google.golang.org/grpc/internal/syscall"
3637
"google.golang.org/grpc/status"
3738
"google.golang.org/grpc/testdata"
@@ -185,11 +186,21 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc
185186
}
186187
}
187188

188-
// TODO add open loop distribution.
189-
switch config.LoadParams.Load.(type) {
189+
// If set, perform an open loop, if not perform a closed loop. An open loop
190+
// asynchronously starts RPCs based on random start times derived from a
191+
// Poisson distribution. A closed loop performs RPCs in a blocking manner,
192+
// and runs the next RPC after the previous RPC completes and returns.
193+
var poissonLambda *float64
194+
switch t := config.LoadParams.Load.(type) {
190195
case *testpb.LoadParams_ClosedLoop:
191196
case *testpb.LoadParams_Poisson:
192-
return status.Errorf(codes.Unimplemented, "unsupported load params: %v", config.LoadParams)
197+
if t.Poisson == nil {
198+
return status.Errorf(codes.InvalidArgument, "poisson is nil, needs to be set")
199+
}
200+
if t.Poisson.OfferedLoad <= 0 {
201+
return status.Errorf(codes.InvalidArgument, "poisson.offered is <= 0: %v, needs to be >0", t.Poisson.OfferedLoad)
202+
}
203+
poissonLambda = &t.Poisson.OfferedLoad
193204
default:
194205
return status.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams)
195206
}
@@ -198,11 +209,9 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc
198209

199210
switch config.RpcType {
200211
case testpb.RpcType_UNARY:
201-
bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize)
202-
// TODO open loop.
212+
bc.unaryLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
203213
case testpb.RpcType_STREAMING:
204-
bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
205-
// TODO open loop.
214+
bc.streamingLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
206215
default:
207216
return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
208217
}
@@ -246,7 +255,7 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
246255
return bc, nil
247256
}
248257

249-
func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int) {
258+
func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
250259
for ic, conn := range conns {
251260
client := testgrpc.NewBenchmarkServiceClient(conn)
252261
// For each connection, create rpcCountPerConn goroutines to do rpc.
@@ -260,36 +269,44 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
260269
// Now relying on worker client to reserve time to do warm up.
261270
// The worker client needs to wait for some time after client is created,
262271
// before starting benchmark.
263-
done := make(chan bool)
264-
for {
265-
go func() {
266-
start := time.Now()
267-
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
272+
if poissonLambda == nil { // Closed loop.
273+
done := make(chan bool)
274+
for {
275+
go func() {
276+
start := time.Now()
277+
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
278+
select {
279+
case <-bc.stop:
280+
case done <- false:
281+
}
282+
return
283+
}
284+
elapse := time.Since(start)
285+
bc.lockingHistograms[idx].add(int64(elapse))
268286
select {
269287
case <-bc.stop:
270-
case done <- false:
288+
case done <- true:
271289
}
272-
return
273-
}
274-
elapse := time.Since(start)
275-
bc.lockingHistograms[idx].add(int64(elapse))
290+
}()
276291
select {
277292
case <-bc.stop:
278-
case done <- true:
293+
return
294+
case <-done:
279295
}
280-
}()
281-
select {
282-
case <-bc.stop:
283-
return
284-
case <-done:
285296
}
297+
} else { // Open loop.
298+
timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second))
299+
time.AfterFunc(timeBetweenRPCs, func() {
300+
bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda)
301+
})
286302
}
303+
287304
}(idx)
288305
}
289306
}
290307
}
291308

292-
func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) {
309+
func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
293310
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
294311
if payloadType == "bytebuf" {
295312
doRPC = benchmark.DoByteBufStreamingRoundTrip
@@ -304,33 +321,69 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
304321
if err != nil {
305322
logger.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
306323
}
307-
// Create histogram for each goroutine.
308324
idx := ic*rpcCountPerConn + j
309325
bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
310-
// Start goroutine on the created mutex and histogram.
311-
go func(idx int) {
312-
// TODO: do warm up if necessary.
313-
// Now relying on worker client to reserve time to do warm up.
314-
// The worker client needs to wait for some time after client is created,
315-
// before starting benchmark.
316-
for {
317-
start := time.Now()
318-
if err := doRPC(stream, reqSize, respSize); err != nil {
319-
return
320-
}
321-
elapse := time.Since(start)
322-
bc.lockingHistograms[idx].add(int64(elapse))
323-
select {
324-
case <-bc.stop:
325-
return
326-
default:
326+
if poissonLambda == nil { // Closed loop.
327+
// Start goroutine on the created mutex and histogram.
328+
go func(idx int) {
329+
// TODO: do warm up if necessary.
330+
// Now relying on worker client to reserve time to do warm up.
331+
// The worker client needs to wait for some time after client is created,
332+
// before starting benchmark.
333+
for {
334+
start := time.Now()
335+
if err := doRPC(stream, reqSize, respSize); err != nil {
336+
return
337+
}
338+
elapse := time.Since(start)
339+
bc.lockingHistograms[idx].add(int64(elapse))
340+
select {
341+
case <-bc.stop:
342+
return
343+
default:
344+
}
327345
}
328-
}
329-
}(idx)
346+
}(idx)
347+
} else { // Open loop.
348+
timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second))
349+
time.AfterFunc(timeBetweenRPCs, func() {
350+
bc.poissonStreaming(stream, idx, reqSize, respSize, *poissonLambda, doRPC)
351+
})
352+
}
330353
}
331354
}
332355
}
333356

357+
func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) {
358+
go func() {
359+
start := time.Now()
360+
if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
361+
return
362+
}
363+
elapse := time.Since(start)
364+
bc.lockingHistograms[idx].add(int64(elapse))
365+
}()
366+
timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second))
367+
time.AfterFunc(timeBetweenRPCs, func() {
368+
bc.poissonUnary(client, idx, reqSize, respSize, lambda)
369+
})
370+
}
371+
372+
func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_StreamingCallClient, idx int, reqSize int, respSize int, lambda float64, doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error) {
373+
go func() {
374+
start := time.Now()
375+
if err := doRPC(stream, reqSize, respSize); err != nil {
376+
return
377+
}
378+
elapse := time.Since(start)
379+
bc.lockingHistograms[idx].add(int64(elapse))
380+
}()
381+
timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second))
382+
time.AfterFunc(timeBetweenRPCs, func() {
383+
bc.poissonStreaming(stream, idx, reqSize, respSize, lambda, doRPC)
384+
})
385+
}
386+
334387
// getStats returns the stats for benchmark client.
335388
// It resets lastResetTime and all histograms if argument reset is true.
336389
func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {

internal/grpcrand/grpcrand.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ func Uint32() uint32 {
8080
return r.Uint32()
8181
}
8282

83+
// ExpFloat64 implements rand.ExpFloat64 on the grpcrand global source.
84+
func ExpFloat64() float64 {
85+
mu.Lock()
86+
defer mu.Unlock()
87+
return r.ExpFloat64()
88+
}
89+
8390
// Shuffle implements rand.Shuffle on the grpcrand global source.
8491
var Shuffle = func(n int, f func(int, int)) {
8592
mu.Lock()

0 commit comments

Comments
 (0)