Skip to content

Commit 16c3b7d

Browse files
authored
examples: add example for ORCA load reporting (#6114)
1 parent b458a4f commit 16c3b7d

File tree

7 files changed

+305
-3
lines changed

7 files changed

+305
-3
lines changed

examples/examples_test.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ EXAMPLES=(
6464
"features/metadata_interceptor"
6565
"features/multiplex"
6666
"features/name_resolving"
67+
"features/orca"
6768
"features/retry"
6869
"features/unix_abstract"
6970
)
@@ -75,6 +76,7 @@ declare -A SERVER_ARGS=(
7576

7677
declare -A CLIENT_ARGS=(
7778
["features/unix_abstract"]="-addr $UNIX_ADDR"
79+
["features/orca"]="-test=true"
7880
["default"]="-addr localhost:$SERVER_PORT"
7981
)
8082

@@ -114,6 +116,7 @@ declare -A EXPECTED_SERVER_OUTPUT=(
114116
["features/metadata_interceptor"]="key1 from metadata: "
115117
["features/multiplex"]=":50051"
116118
["features/name_resolving"]="serving on localhost:50051"
119+
["features/orca"]="Server listening"
117120
["features/retry"]="request succeeded count: 4"
118121
["features/unix_abstract"]="serving on @abstract-unix-socket"
119122
)
@@ -134,6 +137,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=(
134137
["features/metadata_interceptor"]="BidiStreaming Echo: hello world"
135138
["features/multiplex"]="Greeting: Hello multiplex"
136139
["features/name_resolving"]="calling helloworld.Greeter/SayHello to \"example:///resolver.example.grpc.io\""
140+
["features/orca"]="Per-call load report received: map\[db_queries:10\]"
137141
["features/retry"]="UnaryEcho reply: message:\"Try and Success\""
138142
["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket"
139143
)

examples/features/orca/README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# ORCA Load Reporting
2+
3+
ORCA is a protocol for reporting load between servers and clients. This
4+
example shows how to implement this from both the client and server side. For
5+
more details, please see [gRFC
6+
A51](https://github.com/grpc/proposal/blob/master/A51-custom-backend-metrics.md)
7+
8+
## Try it
9+
10+
```
11+
go run server/main.go
12+
```
13+
14+
```
15+
go run client/main.go
16+
```
17+
18+
## Explanation
19+
20+
gRPC ORCA support provides two different ways to report load data to clients
21+
from servers: out-of-band and per-RPC. Out-of-band metrics are reported
22+
regularly at some interval on a stream, while per-RPC metrics are reported
23+
along with the trailers at the end of a call. Both of these mechanisms are
24+
optional and work independently.
25+
26+
The full ORCA API documentation is available here:
27+
https://pkg.go.dev/google.golang.org/grpc/orca
28+
29+
### Out-of-band Metrics
30+
31+
The server registers an ORCA service that is used for out-of-band metrics. It
32+
does this by using `orca.Register()` and then setting metrics on the returned
33+
`orca.Service` using its methods.
34+
35+
The client receives out-of-band metrics via the LB policy. It receives
36+
callbacks to a listener by registering the listener on a `SubConn` via
37+
`orca.RegisterOOBListener`.
38+
39+
### Per-RPC Metrics
40+
41+
The server is set up to report query cost metrics in its RPC handler. For
42+
per-RPC metrics to be reported, the gRPC server must be created with the
43+
`orca.CallMetricsServerOption()` option, and metrics are set by calling methods
44+
on the returned `orca.CallMetricRecorder` from
45+
`orca.CallMetricRecorderFromContext()`.
46+
47+
The client performs one RPC per second. Per-RPC metrics are available for each
48+
call via the `Done()` callback returned from the LB policy's picker.

examples/features/orca/client/main.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
*
3+
* Copyright 2023 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
// Binary client is an example client.
20+
package main
21+
22+
import (
23+
"context"
24+
"flag"
25+
"fmt"
26+
"log"
27+
"time"
28+
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/balancer"
31+
"google.golang.org/grpc/connectivity"
32+
"google.golang.org/grpc/credentials/insecure"
33+
"google.golang.org/grpc/orca"
34+
35+
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
36+
pb "google.golang.org/grpc/examples/features/proto/echo"
37+
)
38+
39+
var addr = flag.String("addr", "localhost:50051", "the address to connect to")
40+
var test = flag.Bool("test", false, "if set, only 1 RPC is performed before exiting")
41+
42+
func main() {
43+
flag.Parse()
44+
45+
// Set up a connection to the server. Configure to use our custom LB
46+
// policy which will receive all the ORCA load reports.
47+
conn, err := grpc.Dial(*addr,
48+
grpc.WithTransportCredentials(insecure.NewCredentials()),
49+
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"orca_example":{}}]}`),
50+
)
51+
if err != nil {
52+
log.Fatalf("did not connect: %v", err)
53+
}
54+
defer conn.Close()
55+
56+
c := pb.NewEchoClient(conn)
57+
58+
// Perform RPCs once per second.
59+
ticker := time.NewTicker(time.Second)
60+
for range ticker.C {
61+
func() {
62+
// Use an anonymous function to ensure context cancelation via defer.
63+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
64+
defer cancel()
65+
if _, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "test echo message"}); err != nil {
66+
log.Fatalf("Error from UnaryEcho call: %v", err)
67+
}
68+
}()
69+
if *test {
70+
return
71+
}
72+
}
73+
74+
}
75+
76+
// Register an ORCA load balancing policy to receive per-call metrics and
77+
// out-of-band metrics.
78+
func init() {
79+
balancer.Register(orcaLBBuilder{})
80+
}
81+
82+
type orcaLBBuilder struct{}
83+
84+
func (orcaLBBuilder) Name() string { return "orca_example" }
85+
func (orcaLBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
86+
return &orcaLB{cc: cc}
87+
}
88+
89+
// orcaLB is an incomplete LB policy designed to show basic ORCA load reporting
90+
// functionality. It collects per-call metrics in the `Done` callback returned
91+
// by its picker, and it collects out-of-band metrics by registering a listener
92+
// when its SubConn is created. It does not follow general LB policy best
93+
// practices and makes assumptions about the simple test environment it is
94+
// designed to run within.
95+
type orcaLB struct {
96+
cc balancer.ClientConn
97+
}
98+
99+
func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {
100+
// We assume only one update, ever, containing exactly one address, given
101+
// the use of the "passthrough" (default) name resolver.
102+
103+
addrs := ccs.ResolverState.Addresses
104+
if len(addrs) != 1 {
105+
return fmt.Errorf("orcaLB: expected 1 address; received: %v", addrs)
106+
}
107+
108+
// Create one SubConn for the address and connect it.
109+
sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
110+
if err != nil {
111+
return fmt.Errorf("orcaLB: error creating SubConn: %v", err)
112+
}
113+
sc.Connect()
114+
115+
// Register a simple ORCA OOB listener on the SubConn. We request a 1
116+
// second report interval, but in this example the server indicated the
117+
// minimum interval it will allow is 3 seconds, so reports will only be
118+
// sent that often.
119+
orca.RegisterOOBListener(sc, orcaLis{}, orca.OOBListenerOptions{ReportInterval: time.Second})
120+
121+
return nil
122+
}
123+
124+
func (o *orcaLB) ResolverError(error) {}
125+
126+
func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
127+
if scs.ConnectivityState == connectivity.Ready {
128+
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}})
129+
}
130+
}
131+
132+
func (o *orcaLB) Close() {}
133+
134+
type picker struct {
135+
sc balancer.SubConn
136+
}
137+
138+
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
139+
return balancer.PickResult{
140+
SubConn: p.sc,
141+
Done: func(di balancer.DoneInfo) {
142+
fmt.Println("Per-call load report received:", di.ServerLoad.(*v3orcapb.OrcaLoadReport).GetRequestCost())
143+
},
144+
}, nil
145+
}
146+
147+
// orcaLis is the out-of-band load report listener that we pass to
148+
// orca.RegisterOOBListener to receive periodic load report information.
149+
type orcaLis struct{}
150+
151+
func (orcaLis) OnLoadReport(lr *v3orcapb.OrcaLoadReport) {
152+
fmt.Println("Out-of-band load report received:", lr)
153+
}

examples/features/orca/server/main.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
*
3+
* Copyright 2023 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
// Binary server is an example server.
20+
package main
21+
22+
import (
23+
"context"
24+
"flag"
25+
"fmt"
26+
"log"
27+
"net"
28+
"time"
29+
30+
"google.golang.org/grpc"
31+
"google.golang.org/grpc/codes"
32+
"google.golang.org/grpc/internal"
33+
"google.golang.org/grpc/orca"
34+
"google.golang.org/grpc/status"
35+
36+
pb "google.golang.org/grpc/examples/features/proto/echo"
37+
)
38+
39+
var port = flag.Int("port", 50051, "the port to serve on")
40+
41+
type server struct {
42+
pb.UnimplementedEchoServer
43+
}
44+
45+
func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
46+
// Report a sample cost for this query.
47+
cmr := orca.CallMetricRecorderFromContext(ctx)
48+
if cmr == nil {
49+
return nil, status.Errorf(codes.Internal, "unable to retrieve call metric recorder (missing ORCA ServerOption?)")
50+
}
51+
cmr.SetRequestCost("db_queries", 10)
52+
53+
return &pb.EchoResponse{Message: in.Message}, nil
54+
}
55+
56+
func main() {
57+
flag.Parse()
58+
59+
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
60+
if err != nil {
61+
log.Fatalf("Failed to listen: %v", err)
62+
}
63+
fmt.Printf("Server listening at %v\n", lis.Addr())
64+
65+
// Create the gRPC server with the orca.CallMetricsServerOption() option,
66+
// which will enable per-call metric recording.
67+
s := grpc.NewServer(orca.CallMetricsServerOption())
68+
pb.RegisterEchoServer(s, &server{})
69+
70+
// Register the orca service for out-of-band metric reporting, and set the
71+
// minimum reporting interval to 3 seconds. Note that, by default, the
72+
// minimum interval must be at least 30 seconds, but 3 seconds is set via
73+
// an internal-only option for illustration purposes only.
74+
opts := orca.ServiceOptions{MinReportingInterval: 3 * time.Second}
75+
internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&opts)
76+
orcaSvc, err := orca.Register(s, opts)
77+
if err != nil {
78+
log.Fatalf("Failed to register ORCA service: %v", err)
79+
}
80+
81+
// Simulate CPU utilization reporting.
82+
go func() {
83+
for {
84+
orcaSvc.SetCPUUtilization(.5)
85+
time.Sleep(2 * time.Second)
86+
orcaSvc.SetCPUUtilization(.9)
87+
time.Sleep(2 * time.Second)
88+
}
89+
}()
90+
91+
s.Serve(lis)
92+
}

examples/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module google.golang.org/grpc/examples
33
go 1.17
44

55
require (
6+
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b
67
github.com/golang/protobuf v1.5.2
78
golang.org/x/oauth2 v0.4.0
89
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
@@ -16,7 +17,6 @@ require (
1617
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
1718
github.com/cespare/xxhash/v2 v2.2.0 // indirect
1819
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect
19-
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b // indirect
2020
github.com/envoyproxy/go-control-plane v0.10.3 // indirect
2121
github.com/envoyproxy/protoc-gen-validate v0.9.1 // indirect
2222
golang.org/x/net v0.8.0 // indirect

internal/internal.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ var (
137137
//
138138
// TODO: Remove this function once the RBAC env var is removed.
139139
UnregisterRBACHTTPFilterForTesting func()
140+
141+
// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
142+
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
140143
)
141144

142145
// HealthChecker defines the signature of the client-side LB channel health checking function.

orca/service.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import (
2424

2525
"google.golang.org/grpc"
2626
"google.golang.org/grpc/codes"
27-
"google.golang.org/grpc/orca/internal"
27+
"google.golang.org/grpc/internal"
28+
ointernal "google.golang.org/grpc/orca/internal"
2829
"google.golang.org/grpc/status"
2930

3031
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
@@ -33,9 +34,10 @@ import (
3334
)
3435

3536
func init() {
36-
internal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
37+
ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
3738
so.allowAnyMinReportingInterval = true
3839
}
40+
internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval
3941
}
4042

4143
// minReportingInterval is the absolute minimum value supported for

0 commit comments

Comments
 (0)