@@ -109,6 +109,7 @@ var (
109
109
clientWriteBufferSize = flags .IntSlice ("clientWriteBufferSize" , []int {- 1 }, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list" )
110
110
serverReadBufferSize = flags .IntSlice ("serverReadBufferSize" , []int {- 1 }, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list" )
111
111
serverWriteBufferSize = flags .IntSlice ("serverWriteBufferSize" , []int {- 1 }, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list" )
112
+ sharedRecvBufferPool = flags .StringWithAllowedValues ("sharedRecvBufferPool" , sharedRecvBufferPoolAll , "Configures the shared receive buffer pool. One of: nil, simple" , allSharedRecvBufferPools )
112
113
113
114
logger = grpclog .Component ("benchmark" )
114
115
)
@@ -133,6 +134,10 @@ const (
133
134
networkModeLAN = "LAN"
134
135
networkModeWAN = "WAN"
135
136
networkLongHaul = "Longhaul"
137
+ // Shared recv buffer pool
138
+ sharedRecvBufferPoolNil = "nil"
139
+ sharedRecvBufferPoolSimple = "simple"
140
+ sharedRecvBufferPoolAll = "all"
136
141
137
142
numStatsBuckets = 10
138
143
warmupCallCount = 10
@@ -144,6 +149,7 @@ var (
144
149
allCompModes = []string {compModeOff , compModeGzip , compModeNop , compModeAll }
145
150
allToggleModes = []string {toggleModeOff , toggleModeOn , toggleModeBoth }
146
151
allNetworkModes = []string {networkModeNone , networkModeLocal , networkModeLAN , networkModeWAN , networkLongHaul }
152
+ allSharedRecvBufferPools = []string {sharedRecvBufferPoolNil , sharedRecvBufferPoolSimple , sharedRecvBufferPoolAll }
147
153
defaultReadLatency = []time.Duration {0 , 40 * time .Millisecond } // if non-positive, no delay.
148
154
defaultReadKbps = []int {0 , 10240 } // if non-positive, infinite
149
155
defaultReadMTU = []int {0 } // if non-positive, infinite
@@ -321,6 +327,15 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
321
327
if bf .ServerWriteBufferSize >= 0 {
322
328
sopts = append (sopts , grpc .WriteBufferSize (bf .ServerWriteBufferSize ))
323
329
}
330
+ switch bf .SharedRecvBufferPool {
331
+ case sharedRecvBufferPoolNil :
332
+ // Do nothing.
333
+ case sharedRecvBufferPoolSimple :
334
+ opts = append (opts , grpc .WithSharedRecvBufferPool (newSimpleSharedRecvBufferPool ()))
335
+ sopts = append (sopts , grpc .SharedRecvBufferPool (newSimpleSharedRecvBufferPool ()))
336
+ default :
337
+ logger .Fatalf ("Unknown shared recv buffer pool type: %v" , bf .SharedRecvBufferPool )
338
+ }
324
339
325
340
sopts = append (sopts , grpc .MaxConcurrentStreams (uint32 (bf .MaxConcurrentCalls + 1 )))
326
341
opts = append (opts , grpc .WithTransportCredentials (insecure .NewCredentials ()))
@@ -528,6 +543,7 @@ type featureOpts struct {
528
543
clientWriteBufferSize []int
529
544
serverReadBufferSize []int
530
545
serverWriteBufferSize []int
546
+ sharedRecvBufferPools []string
531
547
}
532
548
533
549
// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
@@ -572,6 +588,8 @@ func makeFeaturesNum(b *benchOpts) []int {
572
588
featuresNum [i ] = len (b .features .serverReadBufferSize )
573
589
case stats .ServerWriteBufferSize :
574
590
featuresNum [i ] = len (b .features .serverWriteBufferSize )
591
+ case stats .SharedRecvBufferPool :
592
+ featuresNum [i ] = len (b .features .sharedRecvBufferPools )
575
593
default :
576
594
log .Fatalf ("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v" , i , stats .MaxFeatureIndex )
577
595
}
@@ -638,6 +656,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
638
656
ClientWriteBufferSize : b .features .clientWriteBufferSize [curPos [stats .ClientWriteBufferSize ]],
639
657
ServerReadBufferSize : b .features .serverReadBufferSize [curPos [stats .ServerReadBufferSize ]],
640
658
ServerWriteBufferSize : b .features .serverWriteBufferSize [curPos [stats .ServerWriteBufferSize ]],
659
+ SharedRecvBufferPool : b .features .sharedRecvBufferPools [curPos [stats .SharedRecvBufferPool ]],
641
660
}
642
661
if len (b .features .reqPayloadCurves ) == 0 {
643
662
f .ReqSizeBytes = b .features .reqSizeBytes [curPos [stats .ReqSizeBytesIndex ]]
@@ -708,6 +727,7 @@ func processFlags() *benchOpts {
708
727
clientWriteBufferSize : append ([]int (nil ), * clientWriteBufferSize ... ),
709
728
serverReadBufferSize : append ([]int (nil ), * serverReadBufferSize ... ),
710
729
serverWriteBufferSize : append ([]int (nil ), * serverWriteBufferSize ... ),
730
+ sharedRecvBufferPools : setSharedRecvBufferPool (* sharedRecvBufferPool ),
711
731
},
712
732
}
713
733
@@ -783,6 +803,19 @@ func setCompressorMode(val string) []string {
783
803
}
784
804
}
785
805
806
+ func setSharedRecvBufferPool (val string ) []string {
807
+ switch val {
808
+ case sharedRecvBufferPoolNil , sharedRecvBufferPoolSimple :
809
+ return []string {val }
810
+ case sharedRecvBufferPoolAll :
811
+ return []string {sharedRecvBufferPoolNil , sharedRecvBufferPoolSimple }
812
+ default :
813
+ // This should never happen because a wrong value passed to this flag would
814
+ // be caught during flag.Parse().
815
+ return []string {}
816
+ }
817
+ }
818
+
786
819
func main () {
787
820
opts := processFlags ()
788
821
before (opts )
@@ -882,3 +915,33 @@ type nopDecompressor struct{}
882
915
883
916
func (nopDecompressor ) Do (r io.Reader ) ([]byte , error ) { return io .ReadAll (r ) }
884
917
func (nopDecompressor ) Type () string { return compModeNop }
918
+
919
+ // simpleSharedRecvBufferPool is a simple implementation of sharedRecvBufferPool.
920
+ type simpleSharedRecvBufferPool struct {
921
+ sync.Pool
922
+ }
923
+
924
+ func newSimpleSharedRecvBufferPool () * simpleSharedRecvBufferPool {
925
+ return & simpleSharedRecvBufferPool {
926
+ Pool : sync.Pool {
927
+ New : func () interface {} {
928
+ bs := make ([]byte , 0 )
929
+ return & bs
930
+ },
931
+ },
932
+ }
933
+ }
934
+
935
+ func (p * simpleSharedRecvBufferPool ) Get (size int ) []byte {
936
+ bs := p .Pool .Get ().(* []byte )
937
+ if cap (* bs ) < size {
938
+ * bs = make ([]byte , size )
939
+ return * bs
940
+ }
941
+
942
+ return (* bs )[:size ]
943
+ }
944
+
945
+ func (p * simpleSharedRecvBufferPool ) Put (bs * []byte ) {
946
+ p .Pool .Put (bs )
947
+ }
0 commit comments