Skip to content

Commit

Permalink
Revert "feat: TypeMeta as primary object Kind value source (#198)" (#200
Browse files Browse the repository at this point in the history
)

This reverts commit ab92482.
  • Loading branch information
pijusn-cast authored Nov 7, 2024
1 parent 5ef39e1 commit 98d248f
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 7 deletions.
4 changes: 2 additions & 2 deletions internal/services/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func CollectSingleSnapshot(ctx context.Context,
return nil, err
}

log.Debugf("synced %d items", len(d.Cache))
log.Debugf("synced %d items", len(d.CacheLegacy))

return d.ToCASTAIRequest(), nil
}
Expand Down Expand Up @@ -514,7 +514,7 @@ func (c *Controller) send(ctx context.Context) {
nodesByName := map[string]*corev1.Node{}
var nodes []*corev1.Node

for _, item := range c.delta.Cache {
for _, item := range c.delta.CacheLegacy {
n, ok := item.Obj.(*corev1.Node)
if !ok {
continue
Expand Down
126 changes: 121 additions & 5 deletions internal/services/controller/delta/delta.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package delta

import (
"crypto/sha256"
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/sirupsen/logrus"
Expand All @@ -23,7 +25,8 @@ func New(log logrus.FieldLogger, clusterID, clusterVersion, agentVersion string)
clusterVersion: clusterVersion,
agentVersion: agentVersion,
FullSnapshot: true,
Cache: map[string]*Item{},
CacheLegacy: map[string]*Item{},
CacheModern: map[string]*Item{},
}
}

Expand All @@ -35,12 +38,18 @@ type Delta struct {
clusterVersion string
agentVersion string
FullSnapshot bool
Cache map[string]*Item
CacheLegacy map[string]*Item
CacheModern map[string]*Item
}

// Add will add an Item to the Delta Cache. It will debounce the objects.
func (d *Delta) Add(i *Item) {
cache := d.Cache
d.addToLegacy(i)
d.addToModern(i)
}

func (d *Delta) addToModern(i *Item) {
cache := d.CacheModern

if len(i.kind) == 0 {
gvk, err := determineObjectGVK(i.Obj)
Expand All @@ -64,6 +73,26 @@ func (d *Delta) Add(i *Item) {
}
}

func (d *Delta) addToLegacy(i *Item) {
cache := d.CacheLegacy

key := keyObject(i.Obj)

if other, ok := cache[key]; ok && other.event == castai.EventAdd && i.event == castai.EventUpdate {
i.event = castai.EventAdd
cache[key] = i
} else if ok && other.event == castai.EventDelete && (i.event == castai.EventAdd || i.event == castai.EventUpdate) {
i.event = castai.EventUpdate
cache[key] = i
} else {
cache[key] = i
}
}

func keyObject(obj Object) string {
return fmt.Sprintf("%s::%s/%s", reflect.TypeOf(obj).String(), obj.GetNamespace(), obj.GetName())
}

func itemCacheKey(i *Item) string {
return fmt.Sprintf("%s::%s/%s", i.kind, i.Obj.GetNamespace(), i.Obj.GetName())
}
Expand All @@ -72,14 +101,22 @@ func itemCacheKey(i *Item) string {
// delivered.
func (d *Delta) Clear() {
d.FullSnapshot = false
d.Cache = map[string]*Item{}
d.CacheLegacy = map[string]*Item{}
d.CacheModern = map[string]*Item{}
}

// ToCASTAIRequest maps the collected Delta Cache to the castai.Delta type.
func (d *Delta) ToCASTAIRequest() *castai.Delta {
resultLegacy := d.toCASTAIRequestLegacy()
resultModern := d.toCASTAIRequestModern()
logMismatches(d.log, resultLegacy, resultModern)
return resultLegacy
}

func (d *Delta) toCASTAIRequestModern() *castai.Delta {
var items []*castai.DeltaItem

for _, i := range d.Cache {
for _, i := range d.CacheModern {
data, err := Encode(i.Obj)
if err != nil {
d.log.Errorf("failed to encode %T: %v", i.Obj, err)
Expand All @@ -102,6 +139,43 @@ func (d *Delta) ToCASTAIRequest() *castai.Delta {
}
}

func (d *Delta) toCASTAIRequestLegacy() *castai.Delta {
var items []*castai.DeltaItem

for _, i := range d.CacheLegacy {
data, err := Encode(i.Obj)
if err != nil {
d.log.Errorf("failed to encode %T: %v", i.Obj, err)
continue
}

kinds, _, err := scheme.Scheme.ObjectKinds(i.Obj)
if err != nil {
d.log.Errorf("failed to find Object %T kind: %v", i.Obj, err)
continue
}
if len(kinds) == 0 || kinds[0].Kind == "" {
d.log.Errorf("unknown Object kind for Object %T", i.Obj)
continue
}

items = append(items, &castai.DeltaItem{
Event: i.event,
Kind: kinds[0].Kind,
Data: data,
CreatedAt: time.Now().UTC(),
})
}

return &castai.Delta{
ClusterID: d.clusterID,
ClusterVersion: d.clusterVersion,
AgentVersion: d.agentVersion,
FullSnapshot: d.FullSnapshot,
Items: items,
}
}

func Encode(obj interface{}) (*json.RawMessage, error) {
b, err := json.Marshal(obj)
if err != nil {
Expand Down Expand Up @@ -132,6 +206,48 @@ func determineObjectGVK(obj runtime.Object) (schema.GroupVersionKind, error) {
return kinds[0], nil
}

func logMismatches(log logrus.FieldLogger, legacy *castai.Delta, modern *castai.Delta) {
if len(legacy.Items) != len(modern.Items) {
log.Warnf("delta_modern number of items mismatch: %d legacy.Items vs %d modern.Items", len(legacy.Items), len(modern.Items))
}

checkLegacy := toChecksumMap(legacy)
checkModern := toChecksumMap(modern)

for key, _ := range checkLegacy {
if checkModern[key] == nil {
log.Warnf("delta_modern item mismatch: legacy list has item that is missing from modern list: %s", key)
}
delete(checkLegacy, key)
delete(checkModern, key)
}

for key, _ := range checkModern {
if checkLegacy[key] == nil {
log.Warnf("delta_modern item mismatch: modern list has item that is missing from legacy list: %s", key)
}
delete(checkLegacy, key)
delete(checkModern, key)
}
}

func toChecksumMap(delta *castai.Delta) map[string]*castai.DeltaItem {
out := map[string]*castai.DeltaItem{}
for _, i := range delta.Items {
var hash string
if i.Data != nil {
hash = sha256hash(*i.Data)
}
key := fmt.Sprintf("%s-%s-%s", i.Event, i.Kind, hash)
out[key] = i
}
return out
}

func sha256hash(data []byte) string {
return fmt.Sprintf("%x", sha256.Sum256(data))
}

type Object interface {
runtime.Object
metav1.Object
Expand Down

0 comments on commit 98d248f

Please sign in to comment.