Skip to content

Commit

Permalink
benchmark: Unconstrained streaming benchmark (#2512)
Browse files Browse the repository at this point in the history
* Adds unconstrained streaming benchmarks.
* Adds throughput to all scenarios.
* Adds comment to exported function.
* Adds comment to the new rpc.
* Adds a new run mode for unconstrained benchmarks.
* Converts counters to uint64s.
* Decreases default warm up time.
* Addresses PR comments.
* Deletes an unnecessary select/case
* Explains the use of RecvMsg rather than Recv.
  • Loading branch information
Can Guler authored Jan 12, 2019
1 parent bd0b3b2 commit 954fe27
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 90 deletions.
192 changes: 126 additions & 66 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,16 @@ var allCompressionModes = []string{modeOn, modeOff, modeBoth}
var allTraceModes = []string{modeOn, modeOff, modeBoth}

const (
workloadsUnary = "unary"
workloadsStreaming = "streaming"
workloadsAll = "all"
workloadsUnary = "unary"
workloadsStreaming = "streaming"
workloadsUnconstrained = "unconstrained"
workloadsAll = "all"
)

var allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsAll}
var allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}

var (
runMode = []bool{true, true} // {runUnary, runStream}
runMode = []bool{true, true, true} // {runUnary, runStream, runUnconstrained}
// When set the latency to 0 (no delay), the result is slower than the real result with no delay
// because latency simulation section has extra operations
ltc = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
Expand All @@ -113,19 +114,66 @@ var (
}
)

func unaryBenchmark(startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
func unaryBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 {
caller, cleanup := makeFuncUnary(benchFeatures)
defer cleanup()
runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s)
return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s)
}

func streamBenchmark(startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
func streamBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 {
caller, cleanup := makeFuncStream(benchFeatures)
defer cleanup()
runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s)
return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s)
}

func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) {
func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benchtime time.Duration) (uint64, uint64) {
sender, recver, cleanup := makeFuncUnconstrainedStream(benchFeatures)
defer cleanup()

var (
wg sync.WaitGroup
requestCount uint64
responseCount uint64
)
wg.Add(2 * benchFeatures.MaxConcurrentCalls)

// Resets the counters once warmed up
go func() {
<-time.NewTimer(warmuptime).C
atomic.StoreUint64(&requestCount, 0)
atomic.StoreUint64(&responseCount, 0)
}()

bmEnd := time.Now().Add(benchtime + warmuptime)
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
go func(pos int) {
for {
t := time.Now()
if t.After(bmEnd) {
break
}
sender(pos)
atomic.AddUint64(&requestCount, 1)
}
wg.Done()
}(i)
go func(pos int) {
for {
t := time.Now()
if t.After(bmEnd) {
break
}
recver(pos)
atomic.AddUint64(&responseCount, 1)
}
wg.Done()
}(i)
}
wg.Wait()
return requestCount, responseCount
}

func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, func()) {
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
opts := []grpc.DialOption{}
sopts := []grpc.ServerOption{}
Expand Down Expand Up @@ -165,57 +213,22 @@ func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) {
lis = nw.Listener(lis)
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
conn := bm.NewClientConn("" /* target not used */, opts...)
tc := testpb.NewBenchmarkServiceClient(conn)
return testpb.NewBenchmarkServiceClient(conn), func() {
conn.Close()
stopper()
}
}

func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) {
tc, cleanup := makeClient(benchFeatures)
return func(int) {
unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
}, func() {
conn.Close()
stopper()
}
unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
}, cleanup
}

func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
// TODO: Refactor to remove duplication with makeFuncUnary.
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
opts := []grpc.DialOption{}
sopts := []grpc.ServerOption{}
if benchFeatures.EnableCompressor {
sopts = append(sopts,
grpc.RPCCompressor(grpc.NewGZIPCompressor()),
grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
)
opts = append(opts,
grpc.WithCompressor(grpc.NewGZIPCompressor()),
grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
)
}
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
opts = append(opts, grpc.WithInsecure())
tc, cleanup := makeClient(benchFeatures)

var lis net.Listener
if *useBufconn {
bcLis := bufconn.Listen(256 * 1024)
lis = bcLis
opts = append(opts, grpc.WithDialer(func(string, time.Duration) (net.Conn, error) {
return nw.TimeoutDialer(
func(string, string, time.Duration) (net.Conn, error) {
return bcLis.Dial()
})("", "", 0)
}))
} else {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
opts = append(opts, grpc.WithDialer(func(_ string, timeout time.Duration) (net.Conn, error) {
return nw.TimeoutDialer(net.DialTimeout)("tcp", lis.Addr().String(), timeout)
}))
}
lis = nw.Listener(lis)
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
conn := bm.NewClientConn("" /* target not used */, opts...)
tc := testpb.NewBenchmarkServiceClient(conn)
streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls)
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background())
Expand All @@ -224,12 +237,36 @@ func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
}
streams[i] = stream
}

return func(pos int) {
streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
}, func() {
conn.Close()
stopper()
streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
}, cleanup
}

func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func(int), func()) {
tc, cleanup := makeClient(benchFeatures)

streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls)
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
stream, err := tc.UnconstrainedStreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err)
}
streams[i] = stream
}

pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, benchFeatures.ReqSizeBytes)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(benchFeatures.RespSizeBytes),
Payload: pl,
}

return func(pos int) {
streams[pos].Send(req)
}, func(pos int) {
streams[pos].Recv()
}, cleanup
}

func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
Expand All @@ -244,7 +281,7 @@ func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, r
}
}

func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) {
func runBenchmark(caller func(int), startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 {
// Warm up connection.
for i := 0; i < 10; i++ {
caller(0)
Expand All @@ -257,7 +294,7 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be
)
wg.Add(benchFeatures.MaxConcurrentCalls)
bmEnd := time.Now().Add(benchtime)
var count int32
var count uint64
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
go func(pos int) {
for {
Expand All @@ -268,7 +305,7 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be
start := time.Now()
caller(pos)
elapse := time.Since(start)
atomic.AddInt32(&count, 1)
atomic.AddUint64(&count, 1)
mu.Lock()
s.Add(elapse)
mu.Unlock()
Expand All @@ -278,6 +315,7 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), be
}
wg.Wait()
stopTimer(count)
return count
}

var useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
Expand Down Expand Up @@ -318,12 +356,19 @@ func init() {
case workloadsUnary:
runMode[0] = true
runMode[1] = false
runMode[2] = false
case workloadsStreaming:
runMode[0] = false
runMode[1] = true
runMode[2] = false
case workloadsUnconstrained:
runMode[0] = false
runMode[1] = false
runMode[2] = true
case workloadsAll:
runMode[0] = true
runMode[1] = true
runMode[2] = true
default:
log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
workloads, strings.Join(allWorkloads, ", "))
Expand Down Expand Up @@ -402,6 +447,14 @@ func readTimeFromInput(values *[]time.Duration, replace string) {
}
}

func printThroughput(requestCount uint64, requestSize int, responseCount uint64, responseSize int) {
requestThroughput := float64(requestCount) * float64(requestSize) * 8 / benchtime.Seconds()
responseThroughput := float64(responseCount) * float64(responseSize) * 8 / benchtime.Seconds()
fmt.Printf("Number of requests: %v\tRequest throughput: %v bit/s\n", requestCount, requestThroughput)
fmt.Printf("Number of responses: %v\tResponse throughput: %v bit/s\n", responseCount, responseThroughput)
fmt.Println()
}

func main() {
before()
featuresPos := make([]int, 9)
Expand All @@ -422,7 +475,7 @@ func main() {
startBytes = memStats.TotalAlloc
startTime = time.Now()
}
var stopTimer = func(count int32) {
var stopTimer = func(count uint64) {
runtime.ReadMemStats(&memStats)
results = testing.BenchmarkResult{N: int(count), T: time.Since(startTime),
Bytes: 0, MemAllocs: memStats.Mallocs - startAllocs, MemBytes: memStats.TotalAlloc - startBytes}
Expand Down Expand Up @@ -456,23 +509,30 @@ func main() {
channelz.TurnOn()
}
if runMode[0] {
unaryBenchmark(startTimer, stopTimer, benchFeature, benchtime, s)
count := unaryBenchmark(startTimer, stopTimer, benchFeature, benchtime, s)
s.SetBenchmarkResult("Unary", benchFeature, results.N,
results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos)
fmt.Println(s.BenchString())
fmt.Println(s.String())
printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes)
resultSlice = append(resultSlice, s.GetBenchmarkResults())
s.Clear()
}
if runMode[1] {
streamBenchmark(startTimer, stopTimer, benchFeature, benchtime, s)
count := streamBenchmark(startTimer, stopTimer, benchFeature, benchtime, s)
s.SetBenchmarkResult("Stream", benchFeature, results.N,
results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos)
fmt.Println(s.BenchString())
fmt.Println(s.String())
printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes)
resultSlice = append(resultSlice, s.GetBenchmarkResults())
s.Clear()
}
if runMode[2] {
requestCount, responseCount := unconstrainedStreamBenchmark(benchFeature, time.Second, benchtime)
fmt.Printf("Unconstrained Stream-%v\n", benchFeature)
printThroughput(requestCount, benchFeature.ReqSizeBytes, responseCount, benchFeature.RespSizeBytes)
}
bm.AddOne(featuresPos, featuresNum)
}
after(resultSlice)
Expand Down
Loading

0 comments on commit 954fe27

Please sign in to comment.