Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate kubernetes resources #82

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions artifacts/yamls/k8s/01_service_account.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,24 @@ metadata:
creationTimestamp: null
name: egress-watcher-role
rules:
- apiGroups:
- networking.istio.io
resources:
- serviceentries
verbs:
- watch
- get
- list
- apiGroups:
- networking.k8s.io
resources:
- networkpolicies
verbs:
- watch
- get
- list
- apiGroups:
- networking.istio.io
resources:
- serviceentries
verbs:
- watch
- get
- list
- update
- apiGroups:
- networking.k8s.io
resources:
- networkpolicies
verbs:
- watch
- get
- list
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand All @@ -40,4 +42,4 @@ subjects:
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: egress-watcher-role
name: egress-watcher-role
153 changes: 153 additions & 0 deletions pkg/annotations/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) 2023 Cisco Systems, Inc. and its affiliates
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package annotations

import (
"context"
"fmt"
"time"

"github.com/rs/zerolog"
"istio.io/client-go/pkg/apis/networking/v1beta1"
v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
insertedAnnotation string = "egress-watcher.cnwan.io/sdwan-populated"
enabledAnnotation string = "egress-watcher.cnwan.io/sdwan-enabled"
)

type handler struct {
client.Client
log zerolog.Logger
}

type OperationType string

const (
// OperationEnabled means that the object has been enabled, i.e. the SDWAN
// applied configuration and policies to optimize traffic for this
// application.
OperationEnabled OperationType = "enabled"
// OperationInserted means that the object has been inserted into SDWAN's
// database.
OperationInserted OperationType = "inserted"
// OperationDisabled is the opposite OperationEnabled, but the application
// still is present in SDWAN's database.
OperationDisabled OperationType = "disabled"
// OperationRemoved means that the application/object was removed from
// SDWAN's database.
OperationRemoved OperationType = "removed"
)

// TODO: might use the definition from each of these packages instead of
// defining them as strings here.
type ObjectType string

const (
ServiceEntry ObjectType = "serviceentry"
NetworkPolicy ObjectType = "networkpolicy"
)

// Operation contains data about the operation just performed by SDWAN that
// must be reflected as annotation on the object.
type Operation struct {
// The object to annotate.
Object Object
// The type of the operation that was performed.
Type OperationType
}

// Object that originated this event.
type Object struct {
// Name of the object.
Name types.NamespacedName
// Type of object.
Type ObjectType
}

func WatchForUpdates(ctx context.Context, k8sclient client.Client, opsChan chan *Operation, log zerolog.Logger) error {
h := &handler{k8sclient, log.With().Str("worker", "Annotations Handler").Logger()}

for {
select {
case <-ctx.Done():
return context.Canceled
case op := <-opsChan:
obj, err := h.getObject(ctx, op.Object.Name, op.Object.Type)
if client.IgnoreNotFound(err) != nil {
h.log.Err(err).Str("name", op.Object.Name.String()).Msg("cannot get object to annotate: skipping...")
continue
}

if err := h.handleAnnotations(ctx, obj, op.Type); err != nil {
h.log.Err(err).Msg("could not set annotation")
}
}
}
}

func (a *handler) getObject(mainCtx context.Context, name types.NamespacedName, objType ObjectType) (client.Object, error) {
ctx, canc := context.WithTimeout(mainCtx, 30*time.Second)
defer canc()

switch objType {
case ServiceEntry:
var svcEntry v1beta1.ServiceEntry
if err := a.Get(ctx, name, &svcEntry); err != nil {
return nil, fmt.Errorf("cannot get object '%s': %w", name, err)
}

return &svcEntry, nil
case NetworkPolicy:
var netpol v1.NetworkPolicy
if err := a.Get(ctx, name, &netpol); err != nil {
return nil, fmt.Errorf("cannot get object '%s': %w", name, err)
}

return &netpol, nil
default:
return nil, fmt.Errorf("'%s' is not a valid object type", objType)
}
}

func (a *handler) handleAnnotations(mainCtx context.Context, obj client.Object, opType OperationType) error {
ctx, canc := context.WithTimeout(mainCtx, 30*time.Second)
defer canc()

now := time.Now().UTC().Format(time.RFC3339)
annotations := obj.GetAnnotations()
switch opType {
case OperationInserted:
if _, exists := annotations[enabledAnnotation]; !exists {
annotations[insertedAnnotation] = now
}
case OperationEnabled:
annotations[enabledAnnotation] = now
delete(annotations, insertedAnnotation)
case OperationDisabled:
delete(annotations, enabledAnnotation)
case OperationRemoved:
delete(annotations, insertedAnnotation)
}

obj.SetAnnotations(annotations)
return a.Update(ctx, obj, &client.UpdateOptions{})
}
26 changes: 22 additions & 4 deletions pkg/command/run.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco Systems, Inc. and its affiliates
// Copyright (c) 2022, 2023 Cisco Systems, Inc. and its affiliates
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -28,6 +28,7 @@ import (
"syscall"
"time"

"github.com/CloudNativeSDWAN/egress-watcher/pkg/annotations"
"github.com/CloudNativeSDWAN/egress-watcher/pkg/controllers"
"github.com/CloudNativeSDWAN/egress-watcher/pkg/sdwan"
"github.com/CloudNativeSDWAN/egress-watcher/pkg/sdwan/vmanage"
Expand Down Expand Up @@ -290,7 +291,9 @@ func runWithVmanage(kopts *kubeConfigOptions, opts *Options) error {
}

opsChan := make(chan *sdwan.Operation, 100)
annsChan := make(chan *annotations.Operation, 100)
defer close(opsChan)
defer close(annsChan)

_, err = controllers.NewServiceEntryController(mgr, opts.ServiceEntryController, opsChan, log)
if err != nil {
Expand All @@ -305,11 +308,25 @@ func runWithVmanage(kopts *kubeConfigOptions, opts *Options) error {
}

exitWatch := make(chan struct{})

go func() {
defer func() {
exitWatch <- struct{}{}
}()

if err = annotations.WatchForUpdates(ctx, mgr.GetClient(), annsChan, log); err != nil {
log.Err(err).Msg("error while watching for annotation operations")
return
}
}()

go func() {
defer close(exitWatch)
defer func() {
exitWatch <- struct{}{}
}()

if err = vclient.WatchForOperations(ctx, opsChan, *opts.Sdwan.WaitingWindow, log); err != nil {
log.Err(err).Msg("error while watch for operations")
if err = vclient.WatchForOperations(ctx, opsChan, annsChan, *opts.Sdwan.WaitingWindow, log); err != nil {
log.Err(err).Msg("error while watching for sdwan operations")
return
}
}()
Expand All @@ -330,6 +347,7 @@ func runWithVmanage(kopts *kubeConfigOptions, opts *Options) error {

canc()
<-exitChan
<-exitChan
log.Info().Msg("good bye!")

return nil
Expand Down
22 changes: 21 additions & 1 deletion pkg/controllers/network_policy.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022 Cisco Systems, Inc. and its affiliates
// Copyright (c) 2022, 2023 Cisco Systems, Inc. and its affiliates
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -23,9 +23,11 @@ import (
"net"
"reflect"

"github.com/CloudNativeSDWAN/egress-watcher/pkg/annotations"
"github.com/CloudNativeSDWAN/egress-watcher/pkg/sdwan"
"github.com/rs/zerolog"
netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -87,6 +89,11 @@ func (n *netPolsEventHandler) Update(ue event.UpdateEvent, wq workqueue.RateLimi
return
}

obj := annotations.Object{
Name: types.NamespacedName{Name: curr.Name, Namespace: curr.Namespace},
Type: annotations.NetworkPolicy,
}

currParsedIps := getIps(curr)
oldParsedIps := getIps(old)

Expand All @@ -113,6 +120,7 @@ func (n *netPolsEventHandler) Update(ue event.UpdateEvent, wq workqueue.RateLimi
Type: sdwan.OperationRemove,
ApplicationName: curr.Name,
Servers: oldParsedIps,
OriginalObject: obj,
}
return
} else {
Expand All @@ -134,6 +142,7 @@ func (n *netPolsEventHandler) Update(ue event.UpdateEvent, wq workqueue.RateLimi
Type: sdwan.OperationRemove,
ApplicationName: curr.Name,
Servers: oldParsedIps,
OriginalObject: obj,
}
return
}
Expand All @@ -156,6 +165,7 @@ func (n *netPolsEventHandler) Update(ue event.UpdateEvent, wq workqueue.RateLimi
Type: sdwan.OperationRemove,
ApplicationName: curr.Name,
Servers: oldParsedIps,
OriginalObject: obj,
}

return
Expand All @@ -167,6 +177,7 @@ func (n *netPolsEventHandler) Update(ue event.UpdateEvent, wq workqueue.RateLimi
Type: sdwan.OperationRemove,
ApplicationName: curr.Name,
Servers: oldParsedIps,
OriginalObject: obj,
}
}

Expand All @@ -180,6 +191,7 @@ func (n *netPolsEventHandler) Update(ue event.UpdateEvent, wq workqueue.RateLimi
Type: sdwan.OperationAdd,
ApplicationName: curr.Name,
Servers: currParsedIps,
OriginalObject: obj,
}
}

Expand Down Expand Up @@ -215,6 +227,10 @@ func (n *netPolsEventHandler) Delete(de event.DeleteEvent, wq workqueue.RateLimi
Type: sdwan.OperationRemove,
ApplicationName: netpol.Name,
Servers: parsedIps,
OriginalObject: annotations.Object{
Name: types.NamespacedName{Name: netpol.Name, Namespace: netpol.Namespace},
Type: annotations.NetworkPolicy,
},
}
}

Expand Down Expand Up @@ -252,6 +268,10 @@ func (n *netPolsEventHandler) Create(ce event.CreateEvent, wq workqueue.RateLimi
Type: sdwan.OperationAdd,
ApplicationName: netpol.Name,
Servers: parsedIps,
OriginalObject: annotations.Object{
Name: types.NamespacedName{Name: netpol.Name, Namespace: netpol.Namespace},
Type: annotations.NetworkPolicy,
},
}
}

Expand Down
Loading