Skip to content

Commit

Permalink
Add per call options to ygnmi (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanG100 authored Aug 19, 2022
1 parent ef6bc17 commit 0b877c9
Show file tree
Hide file tree
Showing 4 changed files with 453 additions and 43 deletions.
47 changes: 41 additions & 6 deletions internal/testutil/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package testutil

import (
"context"
"io"

"github.com/openconfig/gnmi/testing/fake/gnmi"
"github.com/pkg/errors"
Expand All @@ -29,8 +30,9 @@ import (

// FakeGNMI is a running fake GNMI server.
type FakeGNMI struct {
agent *gnmi.Agent
stub *Stubber
agent *gnmi.Agent
stub *Stubber
getWrapper *getWrapper
}

// StartGNMI launches a new fake GNMI server on the given port
Expand All @@ -47,9 +49,11 @@ func StartGNMI(port int) (*FakeGNMI, error) {
if err != nil {
return nil, err
}
stub := &Stubber{gen: gen}
return &FakeGNMI{
agent: agent,
stub: &Stubber{gen: gen},
agent: agent,
stub: stub,
getWrapper: &getWrapper{stub: stub},
}, nil
}

Expand All @@ -60,7 +64,8 @@ func (g *FakeGNMI) Dial(ctx context.Context, opts ...grpc.DialOption) (gpb.GNMIC
if err != nil {
return nil, errors.Wrapf(err, "DialContext(%s, %v)", g.agent.Address(), opts)
}
return gpb.NewGNMIClient(conn), nil
g.getWrapper.GNMIClient = gpb.NewGNMIClient(conn)
return g.getWrapper, nil
}

// Stub reset the stubbed responses to empty and returns a handle to add new ones.
Expand All @@ -74,9 +79,33 @@ func (g *FakeGNMI) Requests() []*gpb.SubscribeRequest {
return g.agent.Requests()
}

// GetRequests returns the set of GetRequests sent to the gNMI server.
func (g *FakeGNMI) GetRequests() []*gpb.GetRequest {
return g.getWrapper.getRequests
}

// getWrapper adds gNMI Get functionality to a GNMI client.
type getWrapper struct {
gpb.GNMIClient
stub *Stubber
getRequests []*gpb.GetRequest
}

// Get is fake implement of gnmi.Get, it returns the GetResponse contained in the stub.
func (g *getWrapper) Get(ctx context.Context, req *gpb.GetRequest, _ ...grpc.CallOption) (*gpb.GetResponse, error) {
g.getRequests = append(g.getRequests, req)
if len(g.stub.getResponses) == 0 {
return nil, io.EOF
}
resp := g.stub.getResponses[0]
g.stub.getResponses = g.stub.getResponses[1:]
return resp, nil
}

// Stubber is a handle to add stubbed responses.
type Stubber struct {
gen *fpb.FixedGenerator
gen *fpb.FixedGenerator
getResponses []*gpb.GetResponse
}

// Notification appends the given notification as a stub response.
Expand All @@ -87,6 +116,12 @@ func (s *Stubber) Notification(n *gpb.Notification) *Stubber {
return s
}

// AppendGetResponse appends the given GetResponse as a stub response.
func (s *Stubber) AppendGetResponse(gr *gpb.GetResponse) *Stubber {
s.getResponses = append(s.getResponses, gr)
return s
}

// Sync appends a sync stub response.
func (s *Stubber) Sync() *Stubber {
s.gen.Responses = append(s.gen.Responses, &gpb.SubscribeResponse{
Expand Down
90 changes: 78 additions & 12 deletions ygnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// subscribe create a gNMI SubscribeClient for the given query.
func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.SubscriptionList_Mode) (_ gpb.GNMI_SubscribeClient, rerr error) {
func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.SubscriptionList_Mode, o *opt) (_ gpb.GNMI_SubscribeClient, rerr error) {
var subs []*gpb.Subscription
for _, path := range q.subPaths() {
path, err := resolvePath(path)
Expand All @@ -45,7 +45,7 @@ func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.Su
Elem: path.GetElem(),
Origin: path.GetOrigin(),
},
Mode: gpb.SubscriptionMode_TARGET_DEFINED,
Mode: o.mode,
})
}

Expand All @@ -61,21 +61,87 @@ func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.Su
},
},
}
if o.useGet && mode != gpb.SubscriptionList_ONCE {
return nil, fmt.Errorf("using gnmi.Get is only valid for ONCE subscriptions")
}

sub, err := c.gnmiC.Subscribe(ctx)
if err != nil {
return nil, fmt.Errorf("gNMI failed to Subscribe: %w", err)
var sub gpb.GNMI_SubscribeClient
var err error
if o.useGet {
dt := gpb.GetRequest_CONFIG
if q.isState() {
dt = gpb.GetRequest_STATE
}
sub = &getSubscriber{
client: c.gnmiC,
ctx: ctx,
dataType: dt,
}
} else {
sub, err = c.gnmiC.Subscribe(ctx)
if err != nil {
return nil, fmt.Errorf("gNMI failed to Subscribe: %w", err)
}
}
defer closer.Close(&rerr, sub.CloseSend, "error closing gNMI send stream")

log.V(1).Info(prototext.Format(sr))
if !o.useGet {
log.V(1).Info(prototext.Format(sr))
}
if err := sub.Send(sr); err != nil {
return nil, fmt.Errorf("gNMI failed to Send(%+v): %w", sr, err)
}

return sub, nil
}

// getSubscriber is an implementation of gpb.GNMI_SubscribeClient that uses gpb.Get.
// Send() does the Get call and Recv returns the Get response.
type getSubscriber struct {
gpb.GNMI_SubscribeClient
client gpb.GNMIClient
ctx context.Context
notifs []*gpb.Notification
dataType gpb.GetRequest_DataType
}

// Send call gnmi.Get with a request equivalent to the SubscribeRequest.
func (gs *getSubscriber) Send(req *gpb.SubscribeRequest) error {
getReq := &gpb.GetRequest{
Prefix: req.GetSubscribe().GetPrefix(),
Encoding: gpb.Encoding_JSON_IETF,
Type: gs.dataType,
}
for _, sub := range req.GetSubscribe().GetSubscription() {
getReq.Path = append(getReq.Path, sub.GetPath())
}
log.V(1).Info(prototext.Format(getReq))
resp, err := gs.client.Get(gs.ctx, getReq)
if err != nil {
return err
}
gs.notifs = resp.GetNotification()
return nil
}

// Recv returns the result of the Get request, returning io.EOF after resonpse are read.
func (gs *getSubscriber) Recv() (*gpb.SubscribeResponse, error) {
if len(gs.notifs) == 0 {
return nil, io.EOF
}
resp := &gpb.SubscribeResponse{
Response: &gpb.SubscribeResponse_Update{
Update: gs.notifs[0],
},
}
gs.notifs = gs.notifs[1:]
return resp, nil
}

// CloseSend is noop implementation gRPC subscribe interface.
func (gs *getSubscriber) CloseSend() error {
return nil
}

// receive processes a single response from the subscription stream. If an "update" response is
// received, those points are appended to the given data and the result of that concatenation is
// the first return value, and the second return value is false. If a "sync" response is received,
Expand Down Expand Up @@ -152,14 +218,14 @@ func receive(sub gpb.GNMI_SubscribeClient, data []*DataPoint, deletesExpected bo
}
}

// receiveAll receives data until the context deadline is reached, or when in
// ONCE mode, a sync response is received.
func receiveAll(sub gpb.GNMI_SubscribeClient, deletesExpected bool, mode gpb.SubscriptionList_Mode) (data []*DataPoint, err error) {
// receiveAll receives data until the context deadline is reached, or when a sync response is received.
// This func is only used when receiving data from a ONCE subscription.
func receiveAll(sub gpb.GNMI_SubscribeClient, deletesExpected bool) (data []*DataPoint, err error) {
for {
var sync bool
data, sync, err = receive(sub, data, deletesExpected)
if err != nil {
if mode == gpb.SubscriptionList_ONCE && err == io.EOF {
if err == io.EOF {
// TODO(wenbli): It is unclear whether "subscribe ONCE stream closed without sync_response"
// should be an error, so tolerate both scenarios.
// See https://github.com/openconfig/reference/pull/156
Expand All @@ -172,7 +238,7 @@ func receiveAll(sub gpb.GNMI_SubscribeClient, deletesExpected bool, mode gpb.Sub
}
return nil, fmt.Errorf("error receiving gNMI response: %w", err)
}
if mode == gpb.SubscriptionList_ONCE && sync {
if sync {
break
}
}
Expand Down
Loading

0 comments on commit 0b877c9

Please sign in to comment.