Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
DanG100 committed Aug 29, 2023
1 parent c4d17c3 commit fbf083a
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions ygnmi/ygnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ func Get[T any](ctx context.Context, c *Client, q SingletonQuery[T], opts ...Opt
type Watcher[T any] struct {
errCh chan error
lastVal *Value[T]
cancel context.CancelFunc
}

// Await waits for the watch to finish and returns the last received value
Expand All @@ -304,7 +303,6 @@ func (w *Watcher[T]) Await() (*Value[T], error) {
if !ok {
return nil, fmt.Errorf("Await already called and Watcher is closed")
}
w.cancel()
close(w.errCh)
return w.lastVal, err
}
Expand All @@ -315,27 +313,29 @@ func (w *Watcher[T]) Await() (*Value[T], error) {
// Calling Await on the returned Watcher waits for the subscription to complete.
// It returns the last observed value and a boolean that indicates whether that value satisfies the predicate.
func Watch[T any](ctx context.Context, c *Client, q SingletonQuery[T], pred func(*Value[T]) error, opts ...Option) *Watcher[T] {
wCtx, cancel := context.WithCancel(ctx)
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
w := &Watcher[T]{
errCh: make(chan error, 1),
cancel: cancel,
errCh: make(chan error, 1),
}

resolvedOpts := resolveOpts(opts)
sub, err := subscribe[T](wCtx, c, q, gpb.SubscriptionList_STREAM, resolvedOpts)
sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_STREAM, resolvedOpts)
if err != nil {
cancel()
w.errCh <- err
return w
}

dataCh, errCh := receiveStream[T](wCtx, sub, q)
dataCh, errCh := receiveStream[T](ctx, sub, q)
go func() {
defer cancel()
// Create an intially empty GoStruct, into which all received datapoints will be unmarshalled.
gs := q.goStruct()
for {
select {
case <-wCtx.Done():
w.errCh <- wCtx.Err()
case <-ctx.Done():
w.errCh <- ctx.Err()
return
case data := <-dataCh:
val, err := unmarshalAndExtract[T](data, q, gs, resolvedOpts)
Expand Down Expand Up @@ -469,30 +469,33 @@ func GetAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], opts ...O
// Calling Await on the returned Watcher waits for the subscription to complete.
// It returns the last observed value and a boolean that indicates whether that value satisfies the predicate.
func WatchAll[T any](ctx context.Context, c *Client, q WildcardQuery[T], pred func(*Value[T]) error, opts ...Option) *Watcher[T] {
wCtx, cancel := context.WithCancel(ctx)
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
w := &Watcher[T]{
errCh: make(chan error, 1),
cancel: cancel,
errCh: make(chan error, 1),
}
path, err := resolvePath(q.PathStruct())
if err != nil {
cancel()
w.errCh <- err
return w
}
resolvedOpts := resolveOpts(opts)
sub, err := subscribe[T](wCtx, c, q, gpb.SubscriptionList_STREAM, resolvedOpts)
sub, err := subscribe[T](ctx, c, q, gpb.SubscriptionList_STREAM, resolvedOpts)
if err != nil {
cancel()
w.errCh <- err
return w
}

dataCh, errCh := receiveStream[T](wCtx, sub, q)
dataCh, errCh := receiveStream[T](ctx, sub, q)
go func() {
defer cancel()
// Create a map intially empty GoStruct, into which all received datapoints will be unmarshalled based on their path prefixes.
structs := map[string]ygot.ValidatedGoStruct{}
for {
select {
case <-wCtx.Done():
case <-ctx.Done():
w.errCh <- ctx.Err()
return
case data := <-dataCh:
Expand Down

0 comments on commit fbf083a

Please sign in to comment.