diff --git a/internal/testutil/gnmi.go b/internal/testutil/gnmi.go index 0a5f49e..a8a60e1 100644 --- a/internal/testutil/gnmi.go +++ b/internal/testutil/gnmi.go @@ -17,6 +17,7 @@ package testutil import ( "context" + "io" "github.com/openconfig/gnmi/testing/fake/gnmi" "github.com/pkg/errors" @@ -29,8 +30,9 @@ import ( // FakeGNMI is a running fake GNMI server. type FakeGNMI struct { - agent *gnmi.Agent - stub *Stubber + agent *gnmi.Agent + stub *Stubber + getWrapper *getWrapper } // StartGNMI launches a new fake GNMI server on the given port @@ -47,9 +49,11 @@ func StartGNMI(port int) (*FakeGNMI, error) { if err != nil { return nil, err } + stub := &Stubber{gen: gen} return &FakeGNMI{ - agent: agent, - stub: &Stubber{gen: gen}, + agent: agent, + stub: stub, + getWrapper: &getWrapper{stub: stub}, }, nil } @@ -60,7 +64,8 @@ func (g *FakeGNMI) Dial(ctx context.Context, opts ...grpc.DialOption) (gpb.GNMIC if err != nil { return nil, errors.Wrapf(err, "DialContext(%s, %v)", g.agent.Address(), opts) } - return gpb.NewGNMIClient(conn), nil + g.getWrapper.GNMIClient = gpb.NewGNMIClient(conn) + return g.getWrapper, nil } // Stub reset the stubbed responses to empty and returns a handle to add new ones. @@ -74,9 +79,33 @@ func (g *FakeGNMI) Requests() []*gpb.SubscribeRequest { return g.agent.Requests() } +// GetRequests returns the set of GetRequests sent to the gNMI server. +func (g *FakeGNMI) GetRequests() []*gpb.GetRequest { + return g.getWrapper.getRequests +} + +// getWrapper adds gNMI Get functionality to a GNMI client. +type getWrapper struct { + gpb.GNMIClient + stub *Stubber + getRequests []*gpb.GetRequest +} + +// Get is fake implement of gnmi.Get, it returns the GetResponse contained in the stub. +func (g *getWrapper) Get(ctx context.Context, req *gpb.GetRequest, _ ...grpc.CallOption) (*gpb.GetResponse, error) { + g.getRequests = append(g.getRequests, req) + if len(g.stub.getResponses) == 0 { + return nil, io.EOF + } + resp := g.stub.getResponses[0] + g.stub.getResponses = g.stub.getResponses[1:] + return resp, nil +} + // Stubber is a handle to add stubbed responses. type Stubber struct { - gen *fpb.FixedGenerator + gen *fpb.FixedGenerator + getResponses []*gpb.GetResponse } // Notification appends the given notification as a stub response. @@ -87,6 +116,12 @@ func (s *Stubber) Notification(n *gpb.Notification) *Stubber { return s } +// AppendGetResponse appends the given GetResponse as a stub response. +func (s *Stubber) AppendGetResponse(gr *gpb.GetResponse) *Stubber { + s.getResponses = append(s.getResponses, gr) + return s +} + // Sync appends a sync stub response. func (s *Stubber) Sync() *Stubber { s.gen.Responses = append(s.gen.Responses, &gpb.SubscribeResponse{ diff --git a/ygnmi/gnmi.go b/ygnmi/gnmi.go index 7e7ff13..f0b1ae2 100644 --- a/ygnmi/gnmi.go +++ b/ygnmi/gnmi.go @@ -33,7 +33,7 @@ import ( ) // subscribe create a gNMI SubscribeClient for the given query. -func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.SubscriptionList_Mode) (_ gpb.GNMI_SubscribeClient, rerr error) { +func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.SubscriptionList_Mode, o *opt) (_ gpb.GNMI_SubscribeClient, rerr error) { var subs []*gpb.Subscription for _, path := range q.subPaths() { path, err := resolvePath(path) @@ -45,7 +45,7 @@ func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.Su Elem: path.GetElem(), Origin: path.GetOrigin(), }, - Mode: gpb.SubscriptionMode_TARGET_DEFINED, + Mode: o.mode, }) } @@ -61,14 +61,32 @@ func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.Su }, }, } + if o.useGet && mode != gpb.SubscriptionList_ONCE { + return nil, fmt.Errorf("using gnmi.Get is only valid for ONCE subscriptions") + } - sub, err := c.gnmiC.Subscribe(ctx) - if err != nil { - return nil, fmt.Errorf("gNMI failed to Subscribe: %w", err) + var sub gpb.GNMI_SubscribeClient + var err error + if o.useGet { + dt := gpb.GetRequest_CONFIG + if q.isState() { + dt = gpb.GetRequest_STATE + } + sub = &getSubscriber{ + client: c.gnmiC, + ctx: ctx, + dataType: dt, + } + } else { + sub, err = c.gnmiC.Subscribe(ctx) + if err != nil { + return nil, fmt.Errorf("gNMI failed to Subscribe: %w", err) + } } defer closer.Close(&rerr, sub.CloseSend, "error closing gNMI send stream") - - log.V(1).Info(prototext.Format(sr)) + if !o.useGet { + log.V(1).Info(prototext.Format(sr)) + } if err := sub.Send(sr); err != nil { return nil, fmt.Errorf("gNMI failed to Send(%+v): %w", sr, err) } @@ -76,6 +94,54 @@ func subscribe[T any](ctx context.Context, c *Client, q AnyQuery[T], mode gpb.Su return sub, nil } +// getSubscriber is an implementation of gpb.GNMI_SubscribeClient that uses gpb.Get. +// Send() does the Get call and Recv returns the Get response. +type getSubscriber struct { + gpb.GNMI_SubscribeClient + client gpb.GNMIClient + ctx context.Context + notifs []*gpb.Notification + dataType gpb.GetRequest_DataType +} + +// Send call gnmi.Get with a request equivalent to the SubscribeRequest. +func (gs *getSubscriber) Send(req *gpb.SubscribeRequest) error { + getReq := &gpb.GetRequest{ + Prefix: req.GetSubscribe().GetPrefix(), + Encoding: gpb.Encoding_JSON_IETF, + Type: gs.dataType, + } + for _, sub := range req.GetSubscribe().GetSubscription() { + getReq.Path = append(getReq.Path, sub.GetPath()) + } + log.V(1).Info(prototext.Format(getReq)) + resp, err := gs.client.Get(gs.ctx, getReq) + if err != nil { + return err + } + gs.notifs = resp.GetNotification() + return nil +} + +// Recv returns the result of the Get request, returning io.EOF after resonpse are read. +func (gs *getSubscriber) Recv() (*gpb.SubscribeResponse, error) { + if len(gs.notifs) == 0 { + return nil, io.EOF + } + resp := &gpb.SubscribeResponse{ + Response: &gpb.SubscribeResponse_Update{ + Update: gs.notifs[0], + }, + } + gs.notifs = gs.notifs[1:] + return resp, nil +} + +// CloseSend is noop implementation gRPC subscribe interface. +func (gs *getSubscriber) CloseSend() error { + return nil +} + // receive processes a single response from the subscription stream. If an "update" response is // received, those points are appended to the given data and the result of that concatenation is // the first return value, and the second return value is false. If a "sync" response is received, @@ -152,14 +218,14 @@ func receive(sub gpb.GNMI_SubscribeClient, data []*DataPoint, deletesExpected bo } } -// receiveAll receives data until the context deadline is reached, or when in -// ONCE mode, a sync response is received. -func receiveAll(sub gpb.GNMI_SubscribeClient, deletesExpected bool, mode gpb.SubscriptionList_Mode) (data []*DataPoint, err error) { +// receiveAll receives data until the context deadline is reached, or when a sync response is received. +// This func is only used when receiving data from a ONCE subscription. +func receiveAll(sub gpb.GNMI_SubscribeClient, deletesExpected bool) (data []*DataPoint, err error) { for { var sync bool data, sync, err = receive(sub, data, deletesExpected) if err != nil { - if mode == gpb.SubscriptionList_ONCE && err == io.EOF { + if err == io.EOF { // TODO(wenbli): It is unclear whether "subscribe ONCE stream closed without sync_response" // should be an error, so tolerate both scenarios. // See https://github.com/openconfig/reference/pull/156 @@ -172,7 +238,7 @@ func receiveAll(sub gpb.GNMI_SubscribeClient, deletesExpected bool, mode gpb.Sub } return nil, fmt.Errorf("error receiving gNMI response: %w", err) } - if mode == gpb.SubscriptionList_ONCE && sync { + if sync { break } } diff --git a/ygnmi/ygnmi.go b/ygnmi/ygnmi.go index e637620..ad8aa09 100644 --- a/ygnmi/ygnmi.go +++ b/ygnmi/ygnmi.go @@ -141,13 +141,46 @@ func NewClient(c gpb.GNMIClient, opts ...ClientOption) (*Client, error) { return yc, nil } +// Option can be used modify the behavior of the gNMI requests used by the ygnmi calls (Lookup, Await, etc.). +type Option func(*opt) + +type opt struct { + useGet bool + mode gpb.SubscriptionMode +} + +// resolveOpts applies all the options and returns a struct containing the result. +func resolveOpts(opts []Option) *opt { + o := &opt{} + for _, opt := range opts { + opt(o) + } + return o +} + +// WithUseGet creates an option to use gnmi.Get instead of gnmi.Subscribe. +// This can only be used on Get, GetAll, Lookup, and LookupAll. +func WithUseGet() Option { + return func(o *opt) { + o.useGet = true + } +} + +// WithSubscriptionMode creates to an option to use input instead of the default (TARGET_DEFINED). +// This option is only relevant for Watch, WatchAll, Collect, CollectAll, Await which are STREAM subscriptions. +func WithSubscriptionMode(mode gpb.SubscriptionMode) Option { + return func(o *opt) { + o.mode = mode + } +} + // Lookup fetches the value of a SingletonQuery with a ONCE subscription. -func Lookup[T any](ctx context.Context, c *Client, q SingletonQuery[T]) (*Value[T], error) { - sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_ONCE) +func Lookup[T any](ctx context.Context, c *Client, q SingletonQuery[T], opts ...Option) (*Value[T], error) { + sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_ONCE, resolveOpts(opts)) if err != nil { return nil, fmt.Errorf("failed to subscribe to path: %w", err) } - data, err := receiveAll(sub, false, gpb.SubscriptionList_ONCE) + data, err := receiveAll(sub, false) if err != nil { return nil, fmt.Errorf("failed to receive to data: %w", err) } @@ -174,9 +207,9 @@ var ( // Get fetches the value of a SingletonQuery with a ONCE subscription, // returning an error that wraps ErrNotPresent if the value is not present. // Use Lookup to get metadata and tolerate non-present data. -func Get[T any](ctx context.Context, c *Client, q SingletonQuery[T]) (T, error) { +func Get[T any](ctx context.Context, c *Client, q SingletonQuery[T], opts ...Option) (T, error) { var zero T - val, err := Lookup(ctx, c, q) + val, err := Lookup(ctx, c, q, opts...) if err != nil { return zero, err } @@ -210,12 +243,12 @@ func (w *Watcher[T]) Await() (*Value[T], error) { // or a non-nil error on failure. Watch can also be stopped by setting a deadline on or canceling the context. // Calling Await on the returned Watcher waits for the subscription to complete. // It returns the last observed value and a boolean that indicates whether that value satisfies the predicate. -func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func(*Value[T]) error) *Watcher[T] { +func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func(*Value[T]) error, opts ...Option) *Watcher[T] { w := &Watcher[T]{ errCh: make(chan error, 1), } - sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_STREAM) + sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_STREAM, resolveOpts(opts)) if err != nil { w.errCh <- err return w @@ -251,13 +284,13 @@ func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func // blocking until a value that is deep equal to the specified val is received // or the context is cancelled. To wait for a generic predicate, or to make a // non-blocking call, use the Watch method instead. -func Await[T any](ctx context.Context, c *Client, q SingletonQuery[T], val T) (*Value[T], error) { +func Await[T any](ctx context.Context, c *Client, q SingletonQuery[T], val T, opts ...Option) (*Value[T], error) { w := Watch(ctx, c, q, func(v *Value[T]) error { if v.present && reflect.DeepEqual(v.val, val) { return nil } return Continue - }) + }, opts...) return w.Await() } @@ -277,7 +310,7 @@ func (c *Collector[T]) Await() ([]*Value[T], error) { // Collect starts an asynchronous collection of the values at the query with a STREAM subscription. // Calling Await on the return Collection waits until the context is cancelled and returns the collected values. -func Collect[T any](ctx context.Context, c *Client, q SingletonQuery[T]) *Collector[T] { +func Collect[T any](ctx context.Context, c *Client, q SingletonQuery[T], opts ...Option) *Collector[T] { collect := &Collector[T]{} collect.w = Watch(ctx, c, q, func(v *Value[T]) error { if !q.isLeaf() { @@ -290,18 +323,18 @@ func Collect[T any](ctx context.Context, c *Client, q SingletonQuery[T]) *Collec } collect.data = append(collect.data, v) return Continue - }) + }, opts...) return collect } // LookupAll fetches the values of a WildcardQuery with a ONCE subscription. // It returns an empty list if no values are present at the path. -func LookupAll[T any](ctx context.Context, c *Client, q WildcardQuery[T]) ([]*Value[T], error) { - sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_ONCE) +func LookupAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], opts ...Option) ([]*Value[T], error) { + sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_ONCE, resolveOpts(opts)) if err != nil { return nil, fmt.Errorf("failed to subscribe to path: %w", err) } - data, err := receiveAll(sub, false, gpb.SubscriptionList_ONCE) + data, err := receiveAll(sub, false) if err != nil { return nil, fmt.Errorf("failed to receive to data: %w", err) } @@ -333,8 +366,8 @@ func LookupAll[T any](ctx context.Context, c *Client, q WildcardQuery[T]) ([]*Va // GetAll fetches the value of a WildcardQuery with a ONCE subscription skipping any non-present paths. // It returns an error that wraps ErrNotPresent if no values were received. // Use LookupAll to also get metadata containing the returned paths. -func GetAll[T any](ctx context.Context, c *Client, q WildcardQuery[T]) ([]T, error) { - vals, err := LookupAll(ctx, c, q) +func GetAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], opts ...Option) ([]T, error) { + vals, err := LookupAll(ctx, c, q, opts...) if err != nil { return nil, err } @@ -355,7 +388,7 @@ func GetAll[T any](ctx context.Context, c *Client, q WildcardQuery[T]) ([]T, err // or a non-nil error on failure. Watch can also be stopped by setting a deadline on or canceling the context. // Calling Await on the returned Watcher waits for the subscription to complete. // It returns the last observed value and a boolean that indicates whether that value satisfies the predicate. -func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred func(*Value[T]) error) *Watcher[T] { +func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred func(*Value[T]) error, opts ...Option) *Watcher[T] { w := &Watcher[T]{ errCh: make(chan error, 1), } @@ -364,7 +397,7 @@ func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred fu w.errCh <- err return w } - sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_STREAM) + sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_STREAM, resolveOpts(opts)) if err != nil { w.errCh <- err return w @@ -411,7 +444,7 @@ func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred fu // CollectAll starts an asynchronous collection of the values at the query with a STREAM subscription. // Calling Await on the return Collection waits until the context is cancelled to elapse and returns the collected values. -func CollectAll[T any](ctx context.Context, c *Client, q WildcardQuery[T]) *Collector[T] { +func CollectAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], opts ...Option) *Collector[T] { collect := &Collector[T]{} collect.w = WatchAll(ctx, c, q, func(v *Value[T]) error { if !q.isLeaf() { @@ -424,7 +457,7 @@ func CollectAll[T any](ctx context.Context, c *Client, q WildcardQuery[T]) *Coll } collect.data = append(collect.data, v) return Continue - }) + }, opts...) return collect } diff --git a/ygnmi/ygnmi_test.go b/ygnmi/ygnmi_test.go index d9e9ef4..559151d 100644 --- a/ygnmi/ygnmi_test.go +++ b/ygnmi/ygnmi_test.go @@ -424,6 +424,38 @@ func TestLookup(t *testing.T) { } }) } + t.Run("use get", func(t *testing.T) { + fakeGNMI.Stub().AppendGetResponse(&gpb.GetResponse{ + Notification: []*gpb.Notification{{ + Timestamp: 100, + Update: []*gpb.Update{{ + Path: leafPath, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(`"foo"`)}}, + }}, + }}, + }) + wantGetRequest := &gpb.GetRequest{ + Encoding: gpb.Encoding_JSON_IETF, + Type: gpb.GetRequest_STATE, + Prefix: &gpb.Path{}, + Path: []*gpb.Path{leafPath}, + } + wantVal := (&ygnmi.Value[string]{ + Path: leafPath, + Timestamp: time.Unix(0, 100), + }).SetVal("foo") + + got, err := ygnmi.Lookup(context.Background(), c, exampleocpath.Root().RemoteContainer().ALeaf().State(), ygnmi.WithUseGet()) + if err != nil { + t.Fatalf("Lookup() returned unexpected error: %v", err) + } + if diff := cmp.Diff(wantVal, got, cmp.AllowUnexported(ygnmi.Value[string]{}), cmpopts.IgnoreFields(ygnmi.Value[string]{}, "RecvTimestamp"), protocmp.Transform()); diff != "" { + t.Errorf("Lookup() returned unexpected diff: %s", diff) + } + if diff := cmp.Diff(wantGetRequest, fakeGNMI.GetRequests()[0], protocmp.Transform()); diff != "" { + t.Errorf("Lookup() GetRequest different from expected: %s", diff) + } + }) } func TestGet(t *testing.T) { @@ -489,6 +521,35 @@ func TestGet(t *testing.T) { } }) } + t.Run("use get", func(t *testing.T) { + fakeGNMI.Stub().AppendGetResponse(&gpb.GetResponse{ + Notification: []*gpb.Notification{{ + Timestamp: 100, + Update: []*gpb.Update{{ + Path: testutil.GNMIPath(t, "/remote-container/config/a-leaf"), + Val: &gpb.TypedValue{Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(`"foo"`)}}, + }}, + }}, + }) + wantGetRequest := &gpb.GetRequest{ + Encoding: gpb.Encoding_JSON_IETF, + Type: gpb.GetRequest_CONFIG, + Prefix: &gpb.Path{}, + Path: []*gpb.Path{testutil.GNMIPath(t, "/remote-container/config/a-leaf")}, + } + wantVal := "foo" + + got, err := ygnmi.Get[string](context.Background(), c, exampleocpath.Root().RemoteContainer().ALeaf().Config(), ygnmi.WithUseGet()) + if err != nil { + t.Fatalf("Get() returned unexpected error: %v", err) + } + if diff := cmp.Diff(wantVal, got, cmp.AllowUnexported(ygnmi.Value[string]{}), cmpopts.IgnoreFields(ygnmi.Value[string]{}, "RecvTimestamp"), protocmp.Transform()); diff != "" { + t.Errorf("Get() returned unexpected diff: %s", diff) + } + if diff := cmp.Diff(wantGetRequest, fakeGNMI.GetRequests()[0], protocmp.Transform()); diff != "" { + t.Errorf("Get() GetRequest different from expected: %s", diff) + } + }) } func TestWatch(t *testing.T) { @@ -505,6 +566,8 @@ func TestWatch(t *testing.T) { wantLastVal *ygnmi.Value[string] wantVals []*ygnmi.Value[string] wantErr string + wantMode gpb.SubscriptionMode + opts []ygnmi.Option }{{ desc: "single notif and pred true", stub: func(s *testutil.Stubber) { @@ -527,6 +590,30 @@ func TestWatch(t *testing.T) { Timestamp: startTime, Path: path, }).SetVal("foo"), + }, { + desc: "single notif and pred true with custom mode", + stub: func(s *testutil.Stubber) { + s.Notification(&gpb.Notification{ + Timestamp: startTime.UnixNano(), + Update: []*gpb.Update{{ + Path: path, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_StringVal{StringVal: "foo"}}, + }}, + }).Sync() + }, + dur: time.Second, + opts: []ygnmi.Option{ygnmi.WithSubscriptionMode(gpb.SubscriptionMode_ON_CHANGE)}, + wantMode: gpb.SubscriptionMode_ON_CHANGE, + wantVals: []*ygnmi.Value[string]{ + (&ygnmi.Value[string]{ + Timestamp: startTime, + Path: path, + }).SetVal("foo")}, + wantSubscriptionPath: path, + wantLastVal: (&ygnmi.Value[string]{ + Timestamp: startTime, + Path: path, + }).SetVal("foo"), }, { desc: "single notif and pred false error EOF", stub: func(s *testutil.Stubber) { @@ -642,11 +729,12 @@ func TestWatch(t *testing.T) { return nil } return ygnmi.Continue - }) + }, tt.opts...) val, err := w.Await() if i < len(tt.wantVals) { t.Errorf("Predicate received too few values: got %d, want %d", i, len(tt.wantVals)) } + verifySubscriptionModesSent(t, fakeGNMI, tt.wantMode) if diff := errdiff.Substring(err, tt.wantErr); diff != "" { t.Fatalf("Await() returned unexpected diff: %s", diff) } @@ -886,6 +974,8 @@ func TestAwait(t *testing.T) { wantSubscriptionPath *gpb.Path wantVal *ygnmi.Value[string] wantErr string + wantMode gpb.SubscriptionMode + opts []ygnmi.Option }{{ desc: "value never equal", stub: func(s *testutil.Stubber) { @@ -917,13 +1007,33 @@ func TestAwait(t *testing.T) { Timestamp: startTime, Path: path, }).SetVal("foo"), + }, { + desc: "success with custom mode", + stub: func(s *testutil.Stubber) { + s.Notification(&gpb.Notification{ + Timestamp: startTime.UnixNano(), + Update: []*gpb.Update{{ + Path: path, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_StringVal{StringVal: "foo"}}, + }}, + }).Sync() + }, + dur: time.Second, + opts: []ygnmi.Option{ygnmi.WithSubscriptionMode(gpb.SubscriptionMode_ON_CHANGE)}, + wantMode: gpb.SubscriptionMode_ON_CHANGE, + wantSubscriptionPath: path, + wantVal: (&ygnmi.Value[string]{ + Timestamp: startTime, + Path: path, + }).SetVal("foo"), }} for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { tt.stub(fakeGNMI.Stub()) ctx, cancel := context.WithTimeout(context.Background(), tt.dur) defer cancel() - val, err := ygnmi.Await(ctx, client, lq, "foo") + val, err := ygnmi.Await(ctx, client, lq, "foo", tt.opts...) + verifySubscriptionModesSent(t, fakeGNMI, tt.wantMode) if diff := errdiff.Substring(err, tt.wantErr); diff != "" { t.Fatalf("Await() returned unexpected diff: %s", diff) } @@ -1033,6 +1143,8 @@ func TestCollect(t *testing.T) { wantSubscriptionPath *gpb.Path wantVals []*ygnmi.Value[string] wantErr string + wantMode gpb.SubscriptionMode + opts []ygnmi.Option }{{ desc: "no values", stub: func(s *testutil.Stubber) { @@ -1076,13 +1188,45 @@ func TestCollect(t *testing.T) { Path: path, }).SetVal("bar"), }, + }, { + desc: "multiple values and custom mode", + stub: func(s *testutil.Stubber) { + s.Notification(&gpb.Notification{ + Timestamp: startTime.UnixNano(), + Update: []*gpb.Update{{ + Path: path, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_StringVal{StringVal: "foo"}}, + }}, + }).Sync().Notification(&gpb.Notification{ + Timestamp: startTime.Add(time.Millisecond).UnixNano(), + Update: []*gpb.Update{{ + Path: path, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_StringVal{StringVal: "bar"}}, + }}, + }) + }, + dur: 100 * time.Millisecond, + wantSubscriptionPath: path, + opts: []ygnmi.Option{ygnmi.WithSubscriptionMode(gpb.SubscriptionMode_ON_CHANGE)}, + wantMode: gpb.SubscriptionMode_ON_CHANGE, + wantErr: "EOF", + wantVals: []*ygnmi.Value[string]{ + (&ygnmi.Value[string]{ + Timestamp: startTime, + Path: path, + }).SetVal("foo"), + (&ygnmi.Value[string]{ + Timestamp: startTime.Add(time.Millisecond), + Path: path, + }).SetVal("bar"), + }, }} for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { tt.stub(fakeGNMI.Stub()) ctx, cancel := context.WithTimeout(context.Background(), tt.dur) defer cancel() - vals, err := ygnmi.Collect(ctx, client, lq).Await() + vals, err := ygnmi.Collect(ctx, client, lq, tt.opts...).Await() if diff := errdiff.Substring(err, tt.wantErr); diff != "" { t.Fatalf("Await() returned unexpected diff: %s", diff) } @@ -1424,6 +1568,38 @@ func TestLookupAll(t *testing.T) { } }) } + t.Run("use get", func(t *testing.T) { + fakeGNMI.Stub().AppendGetResponse(&gpb.GetResponse{ + Notification: []*gpb.Notification{{ + Timestamp: 100, + Update: []*gpb.Update{{ + Path: leafPath, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(`"1"`)}}, + }}, + }}, + }) + wantGetRequest := &gpb.GetRequest{ + Encoding: gpb.Encoding_JSON_IETF, + Type: gpb.GetRequest_STATE, + Prefix: &gpb.Path{}, + Path: []*gpb.Path{leafPath}, + } + wantVal := []*ygnmi.Value[int64]{(&ygnmi.Value[int64]{ + Path: leafPath, + Timestamp: time.Unix(0, 100), + }).SetVal(1)} + + got, err := ygnmi.LookupAll(context.Background(), c, lq, ygnmi.WithUseGet()) + if err != nil { + t.Fatalf("LookupAll() returned unexpected error: %v", err) + } + if diff := cmp.Diff(wantVal, got, cmp.AllowUnexported(ygnmi.Value[int64]{}), cmpopts.IgnoreFields(ygnmi.Value[int64]{}, "RecvTimestamp"), protocmp.Transform()); diff != "" { + t.Errorf("LookupAll() returned unexpected diff: %s", diff) + } + if diff := cmp.Diff(wantGetRequest, fakeGNMI.GetRequests()[0], protocmp.Transform()); diff != "" { + t.Errorf("LookupAll() GetRequest different from expected: %s", diff) + } + }) } func TestGetAll(t *testing.T) { @@ -1475,6 +1651,35 @@ func TestGetAll(t *testing.T) { } }) } + t.Run("use get", func(t *testing.T) { + fakeGNMI.Stub().AppendGetResponse(&gpb.GetResponse{ + Notification: []*gpb.Notification{{ + Timestamp: 100, + Update: []*gpb.Update{{ + Path: leafPath, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(`"1"`)}}, + }}, + }}, + }) + wantGetRequest := &gpb.GetRequest{ + Encoding: gpb.Encoding_JSON_IETF, + Type: gpb.GetRequest_STATE, + Prefix: &gpb.Path{}, + Path: []*gpb.Path{leafPath}, + } + wantVal := []int64{1} + + got, err := ygnmi.GetAll(context.Background(), c, exampleocpath.Root().Model().SingleKeyAny().Value().State(), ygnmi.WithUseGet()) + if err != nil { + t.Fatalf("Get() returned unexpected error: %v", err) + } + if diff := cmp.Diff(wantVal, got, cmp.AllowUnexported(ygnmi.Value[string]{}), cmpopts.IgnoreFields(ygnmi.Value[string]{}, "RecvTimestamp"), protocmp.Transform()); diff != "" { + t.Errorf("Get() returned unexpected diff: %s", diff) + } + if diff := cmp.Diff(wantGetRequest, fakeGNMI.GetRequests()[0], protocmp.Transform()); diff != "" { + t.Errorf("Get() GetRequest different from expected: %s", diff) + } + }) } func TestWatchAll(t *testing.T) { @@ -1493,6 +1698,8 @@ func TestWatchAll(t *testing.T) { wantLastVal *ygnmi.Value[int64] wantVals []*ygnmi.Value[int64] wantErr string + wantMode gpb.SubscriptionMode + opts []ygnmi.Option }{{ desc: "predicate not true", dur: time.Second, @@ -1550,6 +1757,41 @@ func TestWatchAll(t *testing.T) { Timestamp: startTime.Add(time.Millisecond), Path: key11Path, }).SetVal(101), + }, { + desc: "predicate becomes true with custom mode", + dur: time.Second, + stub: func(s *testutil.Stubber) { + s.Notification(&gpb.Notification{ + Timestamp: startTime.UnixNano(), + Update: []*gpb.Update{{ + Path: key10Path, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_IntVal{IntVal: 100}}, + }}, + }).Sync().Notification(&gpb.Notification{ + Timestamp: startTime.Add(time.Millisecond).UnixNano(), + Update: []*gpb.Update{{ + Path: key11Path, + Val: &gpb.TypedValue{Value: &gpb.TypedValue_IntVal{IntVal: 101}}, + }}, + }) + }, + opts: []ygnmi.Option{ygnmi.WithSubscriptionMode(gpb.SubscriptionMode_ON_CHANGE)}, + wantMode: gpb.SubscriptionMode_ON_CHANGE, + wantSubscriptionPath: leafQueryPath, + wantVals: []*ygnmi.Value[int64]{ + (&ygnmi.Value[int64]{ + Timestamp: startTime, + Path: key10Path, + }).SetVal(100), + (&ygnmi.Value[int64]{ + Timestamp: startTime.Add(time.Millisecond), + Path: key11Path, + }).SetVal(101), + }, + wantLastVal: (&ygnmi.Value[int64]{ + Timestamp: startTime.Add(time.Millisecond), + Path: key11Path, + }).SetVal(101), }, { desc: "multiple values in notification", dur: time.Second, @@ -1634,7 +1876,7 @@ func TestWatchAll(t *testing.T) { return nil } return ygnmi.Continue - }) + }, tt.opts...) val, err := w.Await() if i < len(tt.wantVals) { t.Errorf("Predicate received too few values: got %d, want %d", i, len(tt.wantVals)) @@ -1642,6 +1884,7 @@ func TestWatchAll(t *testing.T) { if diff := errdiff.Substring(err, tt.wantErr); diff != "" { t.Fatalf("Await() returned unexpected diff: %s", diff) } + verifySubscriptionModesSent(t, fakeGNMI, tt.wantMode) if val != nil { checkJustReceived(t, val.RecvTimestamp) tt.wantLastVal.RecvTimestamp = val.RecvTimestamp @@ -1790,6 +2033,8 @@ func TestCollectAll(t *testing.T) { wantSubscriptionPath *gpb.Path wantVals []*ygnmi.Value[int64] wantErr string + wantMode gpb.SubscriptionMode + opts []ygnmi.Option }{{ desc: "no values", dur: time.Second, @@ -1799,6 +2044,17 @@ func TestCollectAll(t *testing.T) { wantErr: "EOF", wantSubscriptionPath: leafQueryPath, wantVals: nil, + }, { + desc: "no values with custom mode", + dur: time.Second, + stub: func(s *testutil.Stubber) { + s.Sync() + }, + opts: []ygnmi.Option{ygnmi.WithSubscriptionMode(gpb.SubscriptionMode_ON_CHANGE)}, + wantMode: gpb.SubscriptionMode_ON_CHANGE, + wantErr: "EOF", + wantSubscriptionPath: leafQueryPath, + wantVals: nil, }, { desc: "multiple values", dur: time.Second, @@ -1836,10 +2092,11 @@ func TestCollectAll(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), tt.dur) defer cancel() - vals, err := ygnmi.CollectAll(ctx, client, lq).Await() + vals, err := ygnmi.CollectAll(ctx, client, lq, tt.opts...).Await() if diff := errdiff.Substring(err, tt.wantErr); diff != "" { t.Fatalf("Await() returned unexpected diff: %s", diff) } + verifySubscriptionModesSent(t, fakeGNMI, tt.wantMode) for _, val := range vals { checkJustReceived(t, val.RecvTimestamp) } @@ -2719,3 +2976,22 @@ func verifySubscriptionPathsSent(t *testing.T, fakeGNMI *testutil.FakeGNMI, want t.Errorf("Subscription paths (-want, +got):\n%s", diff) } } + +// verifySubscriptionModesSent verifies the modes of the sent subscription requests is the same as wantModes. +func verifySubscriptionModesSent(t *testing.T, fakeGNMI *testutil.FakeGNMI, wantModes ...gpb.SubscriptionMode) { + t.Helper() + requests := fakeGNMI.Requests() + if len(requests) != 1 { + t.Errorf("Number of subscription requests sent is not 1: %v", requests) + return + } + + var gotModes []gpb.SubscriptionMode + req := requests[0].GetSubscribe() + for _, sub := range req.GetSubscription() { + gotModes = append(gotModes, sub.Mode) + } + if diff := cmp.Diff(wantModes, gotModes, protocmp.Transform()); diff != "" { + t.Errorf("Subscription modes (-want, +got):\n%s", diff) + } +}