Skip to content

Commit

Permalink
Add route reconciler
Browse files Browse the repository at this point in the history
  • Loading branch information
DanG100 committed Nov 28, 2022
1 parent f61fc8d commit 0f10759
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 31 deletions.
108 changes: 108 additions & 0 deletions dataplane/handlers/routes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handlers

import (
"context"
"fmt"
"net"
"strconv"

"github.com/openconfig/lemming/dataplane/internal/engine"
"github.com/openconfig/lemming/gnmi"
"github.com/openconfig/lemming/gnmi/reconciler"
"github.com/openconfig/ygnmi/schemaless"
"github.com/openconfig/ygnmi/ygnmi"

log "github.com/golang/glog"
dpb "github.com/openconfig/lemming/proto/dataplane"
fwdpb "github.com/openconfig/lemming/proto/forwarding"
)

type route struct {
w *ygnmi.Watcher[*dpb.InsertRouteRequest]
fwd fwdpb.ServiceClient
}

// RouteQuery returns a ygnmi query for a route with the given prefix and vrf.
func RouteQuery(vrf uint64, prefix string) ygnmi.ConfigQuery[*dpb.InsertRouteRequest] {
q, err := schemaless.NewConfig[*dpb.InsertRouteRequest](fmt.Sprintf("/dataplane/routes/route[prefix=%s][vrf=%d]", prefix, vrf), gnmi.InternalOrigin)
if err != nil {
log.Fatal(err)
}
return q
}

var (
routesQuery ygnmi.WildcardQuery[*dpb.InsertRouteRequest]
)

// NewRoute returns a new route reconciler.
func NewRoute(fwd fwdpb.ServiceClient) *reconciler.BuiltReconciler {
r := &route{
fwd: fwd,
}
return reconciler.NewBuilder("dataplane-routes").WithStart(r.start).Build()
}

func (r *route) start(ctx context.Context, client *ygnmi.Client) error {
r.w = ygnmi.WatchAll(ctx, client, routesQuery, func(v *ygnmi.Value[*dpb.InsertRouteRequest]) error {
route, present := v.Val()
prefix := v.Path.Elem[2].Key["prefix"]
vrf, err := strconv.ParseUint(v.Path.Elem[2].Key["vrf"], 10, 64)
if err != nil {
log.Warningf("non-int vrf set in path: %v", err)
return ygnmi.Continue
}
if vrf != 0 {
log.Warningf("non-zero vrf")
return ygnmi.Continue
}

_, ipNet, err := net.ParseCIDR(prefix)
if err != nil {
log.Warningf("failed to parse prefix: %v", err)
return ygnmi.Continue
}
ip := ipNet.IP.To4()
isIPv4 := true
if ip == nil {
ip = ipNet.IP.To16()
isIPv4 = false
}

if !present {
if err := engine.DeleteIPRoute(ctx, r.fwd, isIPv4, ipNet.IP, ipNet.Mask); err != nil {
log.Warningf("failed to delete route")
return ygnmi.Continue
}
return ygnmi.Continue
}
if err := engine.AddIPRoute(ctx, r.fwd, isIPv4, ip, ipNet.Mask, route.GetNextHops()); err != nil {
log.Warningf("failed to delete route")
}

return ygnmi.Continue
})
return nil
}

func init() {
q, err := schemaless.NewWildcard[*dpb.InsertRouteRequest]("/dataplane/routes/route[prefix=*][vrf=*]", gnmi.InternalOrigin)
if err != nil {
log.Fatal(err)
}
routesQuery = q
}
2 changes: 1 addition & 1 deletion dataplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (d *Dataplane) Start(ctx context.Context, c gpb.GNMIClient, target string)
if err != nil {
return err
}
d.reconcilers = append(d.reconcilers, handlers.NewInterface(fc))
d.reconcilers = append(d.reconcilers, handlers.NewInterface(fc), handlers.NewRoute(fc))
d.fwd = fc
if err := engine.SetupForwardingTables(ctx, fc); err != nil {
return fmt.Errorf("failed to setup forwarding tables: %v", err)
Expand Down
57 changes: 57 additions & 0 deletions gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,56 @@ func set(schema *ytypes.Schema, cache *cache.Cache, target string, req *gpb.SetR
return nil
}

const (
// InternalOrigin is a special gNMI path origin used to store schemaless values.
InternalOrigin = "lemming-internal"
)

// handleInternalOrigin handles SetRequests whose path has schemaless values.
func (s *Server) handleInternalOrigin(req *gpb.SetRequest) (bool, error) {
notif := &gpb.Notification{
Prefix: &gpb.Path{
Origin: InternalOrigin,
Elem: req.Prefix.Elem,
Target: req.Prefix.Target,
},
Timestamp: time.Now().UnixNano(),
}
var hasInternal bool

for _, del := range req.Delete {
if del.Origin == InternalOrigin {
hasInternal = true
notif.Delete = append(notif.Delete, del)
}
}
if hasInternal {
if err := s.c.cache.GnmiUpdate(notif); err != nil {
return true, err
}
}

notif.Delete = nil

for _, replace := range req.Replace {
if replace.Path.Origin == InternalOrigin {
hasInternal = true
notif.Update = append(notif.Update, replace)
}
}
for _, update := range req.Update {
if update.Path.Origin == InternalOrigin {
hasInternal = true
notif.Update = append(notif.Update, update)
}
}
log.V(2).Infof("internal origin notification: %v", notif)
if hasInternal {
return true, s.c.cache.GnmiUpdate(notif)
}
return false, nil
}

// Set implements lemming's gNMI Set operation.
//
// If the given SetRequest is schema compliant AND passes higher-level
Expand Down Expand Up @@ -292,6 +342,13 @@ func (s *Server) Set(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse
}
}

if found, err := s.handleInternalOrigin(req); found {
if err != nil {
return nil, status.Errorf(codes.Internal, "error handling set request with internal origin: %v", err)
}
return &gpb.SetResponse{}, nil
}

switch gnmiMode {
case ConfigMode:
s.configMu.Lock()
Expand Down
110 changes: 110 additions & 0 deletions gnmi/gnmi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,116 @@ func TestSetState(t *testing.T) {
}
}

func TestSetInternal(t *testing.T) {
ctx := context.Background()
gnmiServer, err := newServer(ctx, "local", true)
if err != nil {
t.Fatalf("cannot create server, got err: %v", err)
}
addr, err := startServer(gnmiServer)
if err != nil {
t.Fatalf("cannot start server, got err: %v", err)
}

pathStr := "/test/foo"
path := mustPath(pathStr)
path.Origin = InternalOrigin

got := []*upd{}
clientCtx, cancel := context.WithCancel(context.Background())
var sendErr, recvErr error
go func(ctx context.Context) {
defer cancel()
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(local.NewCredentials()))
if err != nil {
sendErr = fmt.Errorf("cannot dial gNMI server, %v", err)
return
}

client := gpb.NewGNMIClient(conn)

if _, err := client.Set(metadata.AppendToOutgoingContext(ctx, TimestampMetadataKey, strconv.FormatInt(42, 10)), &gpb.SetRequest{
Prefix: mustTargetPath("local", "", false),
Replace: []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{Value: &gpb.TypedValue_StringVal{StringVal: "test"}},
}},
}); err != nil {
sendErr = fmt.Errorf("set request failed: %v", err)
return
}

subc, err := client.Subscribe(ctx)
if err != nil {
sendErr = err
return
}
sr := &gpb.SubscribeRequest{
Request: &gpb.SubscribeRequest_Subscribe{
Subscribe: &gpb.SubscriptionList{
Prefix: mustTargetPath("local", "", false),
Mode: gpb.SubscriptionList_ONCE,
Subscription: []*gpb.Subscription{{
Path: path,
}},
},
},
}

if err := subc.Send(sr); err != nil {
sendErr = fmt.Errorf("cannot send subscribe request %s, %v", prototext.Format(sr), err)
return
}

for {
in, err := subc.Recv()
if err == io.EOF {
return
}
if err != nil {
recvErr = err
return
}
got = append(got, toUpd(in)...)
}
}(clientCtx)

<-clientCtx.Done()

gnmiServer.c.Stop()

if sendErr != nil {
t.Errorf("got unexpected send error, %v", sendErr)
}

if recvErr != nil {
t.Errorf("got unexpected recv error, %v", recvErr)
}

if diff := cmp.Diff(got, []*upd{{
T: VAL,
TS: 42,
Path: pathStr,
Val: "test",
}, {
T: SYNC,
}}, cmpopts.IgnoreFields(upd{}, "TS")); diff != "" {
t.Fatalf("did not get expected updates, diff(-got,+want)\n:%s", diff)
}

// Test that timestamp is not 42: we don't want the timestamp metadata to affect config values.
if cmp.Equal(got, []*upd{{
T: VAL,
TS: 42,
Path: pathStr,
Val: "world",
}, {
T: SYNC,
}}) {
t.Fatalf("Expected error -- timestamp metadata should be ignored but it is not ignored.")
}
}

// TestSTREAM tests the STREAM mode of gnmit.
func TestSTREAM(t *testing.T) {
ctx := context.Background()
Expand Down
21 changes: 10 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@ require (
github.com/openconfig/goyang v1.1.0
github.com/openconfig/gribi v0.1.1-0.20220622162620-08d53dffce45
github.com/openconfig/gribigo v0.0.0-20220802181317-805e943d8714
github.com/openconfig/kne v0.1.6
github.com/openconfig/ondatra v0.0.0-20221104001721-66d3c2d6da80
github.com/openconfig/ygnmi v0.4.0
github.com/openconfig/ygot v0.25.2
github.com/openconfig/kne v0.1.5
github.com/openconfig/ygnmi v0.6.1
github.com/openconfig/ygot v0.25.3
github.com/p4lang/p4runtime v1.3.0
github.com/sirupsen/logrus v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.13.0
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5
github.com/wenovus/gobgp/v3 v3.0.0-20221024234659-8df3c6938ab7
golang.org/x/exp v0.0.0-20221002003631-540bb7301a08
golang.org/x/net v0.0.0-20221004154528-8021a29435af
golang.org/x/sys v0.0.0-20221010170243-090e33056c14
golang.org/x/exp v0.0.0-20221106115401-f9659909a136
golang.org/x/net v0.1.0
golang.org/x/sys v0.1.0
google.golang.org/grpc v1.50.0
google.golang.org/protobuf v1.28.1
k8s.io/api v0.24.3
Expand Down Expand Up @@ -144,11 +143,11 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1 // indirect
go.universe.tf/metallb v0.13.5 // indirect
golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898 // indirect
go.universe.tf/metallb v0.10.3 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.8 // indirect
golang.org/x/term v0.1.0 // indirect
golang.org/x/text v0.4.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
Loading

0 comments on commit 0f10759

Please sign in to comment.