Skip to content

Commit

Permalink
Merge pull request #31 from vshn/ns-list
Browse files Browse the repository at this point in the history
Adds filtering by namespace and organization labels.
  • Loading branch information
susana-garcia authored Dec 13, 2022
2 parents 4c1a032 + 5ace4d1 commit 8bcf772
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 50 deletions.
8 changes: 6 additions & 2 deletions pkg/clients/cluster/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import (
"fmt"

"github.com/vshn/provider-exoscale/apis"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// InitK8sClient creates a k8s client from the server url and token url
func InitK8sClient(url, token string) (*client.Client, error) {
func InitK8sClient(url, token string) (client.Client, error) {
scheme := runtime.NewScheme()
if err := corev1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("core scheme: %w", err)
}
err := apis.AddToScheme(scheme)
if err != nil {
return nil, fmt.Errorf("cannot add k8s exoscale scheme: %w", err)
Expand All @@ -24,7 +28,7 @@ func InitK8sClient(url, token string) (*client.Client, error) {
if err != nil {
return nil, fmt.Errorf("cannot initialize k8s client: %w", err)
}
return &k8sClient, nil
return k8sClient, nil
}

// InitK8sClientDynamic creates a dynamic k8s client from the server url and token url
Expand Down
99 changes: 70 additions & 29 deletions pkg/service/dbaas/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package dbaas
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"time"

"k8s.io/apimachinery/pkg/api/errors"

pipeline "github.com/ccremer/go-command-pipeline"
"github.com/go-logr/logr"
"github.com/vshn/exoscale-metrics-collector/pkg/clients/exoscale"
"github.com/vshn/exoscale-metrics-collector/pkg/database"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"

Expand Down Expand Up @@ -89,7 +92,7 @@ func (s *Service) Execute(ctx context.Context) error {

p := pipeline.NewPipeline[*Context]()
p.WithSteps(
p.NewStep("Fetch cluster managed DBaaS", s.fetchManagedDBaaS),
p.NewStep("Fetch cluster managed DBaaS and namespaces", s.fetchManagedDBaaSAndNamespaces),
p.NewStep("Fetch exoscale DBaaS usage", s.fetchDBaaSUsage),
p.NewStep("Aggregate DBaaS services by namespace and plan", aggregateDBaaS),
p.WithNestedSteps("Save to billing database", hasAggregatedInstances,
Expand All @@ -101,10 +104,16 @@ func (s *Service) Execute(ctx context.Context) error {
return p.RunWithContext(&Context{Context: ctx})
}

// fetchManagedDBaaS fetches instances from kubernetes cluster
func (s *Service) fetchManagedDBaaS(ctx *Context) error {
// fetchManagedDBaaSAndNamespaces fetches instances and namespaces from kubernetes cluster
func (s *Service) fetchManagedDBaaSAndNamespaces(ctx *Context) error {
log := ctrl.LoggerFrom(ctx)

log.V(1).Info("Listing namespaces from cluster")
namespaces, err := fetchNamespaceWithOrganizationMap(ctx, s.k8sClient)
if err != nil {
return fmt.Errorf("cannot list namespaces: %w", err)
}

var dbaasDetails []Detail
for _, groupVersionResource := range groupVersionResources {
managedResources, err := s.k8sClient.Resource(groupVersionResource).List(ctx, metav1.ListOptions{})
Expand All @@ -116,40 +125,48 @@ func (s *Service) fetchManagedDBaaS(ctx *Context) error {
}

for _, managedResource := range managedResources.Items {
dbaasDetail := Detail{
DBName: managedResource.GetName(),
Type: groupVersionResource.Resource,
}
if organization, exist := managedResource.GetLabels()[service.OrganizationLabel]; exist {
dbaasDetail.Organization = organization
} else {
// cannot get organization from DBaaS
log.Info("Organization label is missing in DBaaS service, skipping...",
"label", service.OrganizationLabel,
"dbaas", managedResource.GetName())
continue
}
if namespace, exist := managedResource.GetLabels()[service.NamespaceLabel]; exist {
dbaasDetail.Namespace = namespace
} else {
// cannot get namespace from DBaaS
log.Info("Namespace label is missing in DBaaS, skipping...",
"label", service.NamespaceLabel,
"dbaas", managedResource.GetName())
dbaasDetail := findDBaaSDetailInNamespacesMap(managedResource, groupVersionResource, namespaces, log)
if dbaasDetail == nil {
continue
}
log.V(1).Info("Added namespace and organization to DBaaS",
"dbaas", managedResource.GetName(),
"namespace", dbaasDetail.Namespace,
"organization", dbaasDetail.Organization)
dbaasDetails = append(dbaasDetails, dbaasDetail)
dbaasDetails = append(dbaasDetails, *dbaasDetail)
}
}

ctx.dbaasDetails = dbaasDetails
return nil
}

func findDBaaSDetailInNamespacesMap(managedResource unstructured.Unstructured, groupVersionResource schema.GroupVersionResource, namespaces map[string]string, log logr.Logger) *Detail {
dbaasDetail := Detail{
DBName: managedResource.GetName(),
Type: groupVersionResource.Resource,
}
if namespace, exist := managedResource.GetLabels()[service.NamespaceLabel]; exist {
organization, ok := namespaces[namespace]
if !ok {
// cannot find namespace in namespace list
log.Info("Namespace not found in namespace list, skipping...",
"namespace", namespace,
"dbaas", managedResource.GetName())
return nil
}
dbaasDetail.Namespace = namespace
dbaasDetail.Organization = organization
} else {
// cannot get namespace from DBaaS
log.Info("Namespace label is missing in DBaaS, skipping...",
"label", service.NamespaceLabel,
"dbaas", managedResource.GetName())
return nil
}
log.V(1).Info("Added namespace and organization to DBaaS",
"dbaas", managedResource.GetName(),
"namespace", dbaasDetail.Namespace,
"organization", dbaasDetail.Organization)
return &dbaasDetail
}

// fetchDBaaSUsage gets DBaaS service usage from Exoscale
func (s *Service) fetchDBaaSUsage(ctx *Context) error {
log := ctrl.LoggerFrom(ctx)
Expand Down Expand Up @@ -237,3 +254,27 @@ func hasAggregatedInstances(ctx *Context) bool {
}
return true
}

func fetchNamespaceWithOrganizationMap(ctx context.Context, k8sClient dynamic.Interface) (map[string]string, error) {
log := ctrl.LoggerFrom(ctx)
nsGroupVersionResource := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "namespaces",
}
list, err := k8sClient.Resource(nsGroupVersionResource).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("cannot get namespace list: %w", err)
}

namespaces := map[string]string{}
for _, ns := range list.Items {
org, ok := ns.GetLabels()[service.OrganizationLabel]
if !ok {
log.Info("Organization label not found in namespace", "namespace", ns.GetName())
continue
}
namespaces[ns.GetName()] = org
}
return namespaces, nil
}
60 changes: 44 additions & 16 deletions pkg/service/sos/objectstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
egoscale "github.com/exoscale/egoscale/v2"
db "github.com/vshn/exoscale-metrics-collector/pkg/database"
exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
k8s "sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -32,10 +34,10 @@ type BucketDetail struct {
}

// NewObjectStorage creates an ObjectStorage with the initial setup
func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient *k8s.Client, databaseURL string) ObjectStorage {
func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient k8s.Client, databaseURL string) ObjectStorage {
return ObjectStorage{
exoscaleClient: exoscaleClient,
k8sClient: *k8sClient,
k8sClient: k8sClient,
database: &db.SosDatabase{
Database: db.Database{
URL: databaseURL,
Expand All @@ -51,7 +53,7 @@ func (o *ObjectStorage) Execute(ctx context.Context) error {

p := pipeline.NewPipeline[context.Context]()
p.WithSteps(
p.NewStep("Fetch managed buckets", o.fetchManagedBuckets),
p.NewStep("Fetch managed buckets and namespaces", o.fetchManagedBucketsAndNamespaces),
p.NewStep("Get bucket usage", o.getBucketUsage),
p.NewStep("Get billing date", o.getBillingDate),
p.NewStep("Save to database", o.saveToDatabase),
Expand Down Expand Up @@ -79,17 +81,24 @@ func (o *ObjectStorage) getBucketUsage(ctx context.Context) error {
return nil
}

func (o *ObjectStorage) fetchManagedBuckets(ctx context.Context) error {
func (o *ObjectStorage) fetchManagedBucketsAndNamespaces(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx)
log.Info("Fetching buckets from cluster")
log.Info("Fetching buckets and namespaces from cluster")

buckets := exoscalev1.BucketList{}
log.V(1).Info("Listing buckets from cluster")
err := o.k8sClient.List(ctx, &buckets)
if err != nil {
return fmt.Errorf("cannot list buckets: %w", err)
}
o.bucketDetails = addOrgAndNamespaceToBucket(ctx, buckets)

log.V(1).Info("Listing namespaces from cluster")
namespaces, err := fetchNamespaceWithOrganizationMap(ctx, o.k8sClient)
if err != nil {
return fmt.Errorf("cannot list namespaces: %w", err)
}

o.bucketDetails = addOrgAndNamespaceToBucket(ctx, buckets, namespaces)
return nil
}

Expand Down Expand Up @@ -148,7 +157,7 @@ func getAggregatedBuckets(ctx context.Context, sosBucketsUsage []oapi.SosBucketU
return aggregatedBuckets
}

func addOrgAndNamespaceToBucket(ctx context.Context, buckets exoscalev1.BucketList) []BucketDetail {
func addOrgAndNamespaceToBucket(ctx context.Context, buckets exoscalev1.BucketList, namespaces map[string]string) []BucketDetail {
log := ctrl.LoggerFrom(ctx)
log.V(1).Info("Gathering org and namespace from buckets")

Expand All @@ -157,17 +166,17 @@ func addOrgAndNamespaceToBucket(ctx context.Context, buckets exoscalev1.BucketLi
bucketDetail := BucketDetail{
BucketName: bucket.Spec.ForProvider.BucketName,
}
if organization, exist := bucket.ObjectMeta.Labels[service.OrganizationLabel]; exist {
bucketDetail.Organization = organization
} else {
// cannot get organization from bucket
log.Info("Organization label is missing in bucket, skipping...",
"label", service.OrganizationLabel,
"bucket", bucket.Name)
continue
}
if namespace, exist := bucket.ObjectMeta.Labels[service.NamespaceLabel]; exist {
organization, ok := namespaces[namespace]
if !ok {
// cannot find namespace in namespace list
log.Info("Namespace not found in namespace list, skipping...",
"namespace", namespace,
"bucket", bucket.Name)
continue
}
bucketDetail.Namespace = namespace
bucketDetail.Organization = organization
} else {
// cannot get namespace from bucket
log.Info("Namespace label is missing in bucket, skipping...",
Expand All @@ -183,3 +192,22 @@ func addOrgAndNamespaceToBucket(ctx context.Context, buckets exoscalev1.BucketLi
}
return bucketDetails
}

func fetchNamespaceWithOrganizationMap(ctx context.Context, k8sclient client.Client) (map[string]string, error) {
log := ctrl.LoggerFrom(ctx)
list := &corev1.NamespaceList{}
if err := k8sclient.List(ctx, list); err != nil {
return nil, fmt.Errorf("cannot get namespace list: %w", err)
}

namespaces := map[string]string{}
for _, ns := range list.Items {
org, ok := ns.Labels[service.OrganizationLabel]
if !ok {
log.Info("Organization label not found in namespace", "namespace", ns.Name)
continue
}
namespaces[ns.Name] = org
}
return namespaces, nil
}
14 changes: 11 additions & 3 deletions pkg/service/sos/objectstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ func TestObjectStorage_GetAggregated(t *testing.T) {
}
}

func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) {
func TestObjectStorage_addOrgAndNamespaceToBucket(t *testing.T) {
tests := map[string]struct {
givenBucketList exoscalev1.BucketList
givenNamespaces map[string]string
expectedBucketDetails []BucketDetail
}{
"GivenBucketListFromExoscale_WhenOrgAndNamespaces_ThenExpectBucketDetailObjects": {
Expand All @@ -123,6 +124,12 @@ func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) {
createBucket("bucket-5", "theta", "orgC"),
},
},
givenNamespaces: map[string]string{
"alpha": "orgA",
"beta": "orgB",
"omega": "orgB",
"theta": "orgC",
},
expectedBucketDetails: []BucketDetail{
createBucketDetail("bucket-1", "alpha", "orgA"),
createBucketDetail("bucket-2", "beta", "orgB"),
Expand All @@ -139,6 +146,7 @@ func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) {
createBucket("bucket-3", "", ""),
},
},
givenNamespaces: map[string]string{},
expectedBucketDetails: []BucketDetail{},
},
}
Expand All @@ -148,10 +156,10 @@ func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) {
ctx := context.Background()

// When
bucketDetails := addOrgAndNamespaceToBucket(ctx, tc.givenBucketList)
bucketDetails := addOrgAndNamespaceToBucket(ctx, tc.givenBucketList, tc.givenNamespaces)

// Then
assert.Equal(t, tc.expectedBucketDetails, bucketDetails)
assert.ElementsMatch(t, tc.expectedBucketDetails, bucketDetails)
})
}
}
Expand Down

0 comments on commit 8bcf772

Please sign in to comment.