Skip to content

Commit

Permalink
Expose init offset and schedule metrics for ConsumerGroup reconciler
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Aug 21, 2023
1 parent c402ae9 commit da76e75
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 24 deletions.
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
)

func (r Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
func (r *Reconciler) newAuthConfigOption(ctx context.Context, cg *kafkainternals.ConsumerGroup) (kafka.ConfigOption, error) {
var secret *corev1.Secret

if hasSecretSpecConfig(cg.Spec.Template.Spec.Auth) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler"
)

func (r Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {
func (r *Reconciler) autoscalerDefaultsFromConfigMap(ctx context.Context, configMapName string) (*autoscaler.AutoscalerConfig, error) {

Check warning on line 28 in control-plane/pkg/reconciler/consumergroup/autoscaler_config.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/autoscaler_config.go#L28

Added line #L28 was not covered by tests
cm, err := r.KubeClient.CoreV1().ConfigMaps(r.SystemNamespace).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error while retrieving the %s config map in namespace %s: %+v", configMapName, r.SystemNamespace, err)
Expand Down
100 changes: 85 additions & 15 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import (
"fmt"
"math"
"sort"
"time"

"github.com/Shopify/sarama"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -36,7 +40,9 @@ import (
"k8s.io/utils/pointer"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/reconciler"

sources "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"
Expand All @@ -60,8 +66,38 @@ import (
var (
ErrNoSubscriberURI = errors.New("no subscriber URI resolved")
ErrNoDeadLetterSinkURI = errors.New("no dead letter sink URI resolved")

scheduleLatencyStat = stats.Int64("schedule_latency", "Latency of consumer group schedule operations", stats.UnitMilliseconds)
// scheduleDistribution defines the bucket boundaries for the histogram of schedule latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
scheduleDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)

initializeOffsetsLatencyStat = stats.Int64("initialize_offsets_latency", "Latency of consumer group offsets initialization operations", stats.UnitMilliseconds)
// initializeOffsetsDistribution defines the bucket boundaries for the histogram of initialize offsets latency metric.
// Bucket boundaries are 10ms, 100ms, 1s, 10s, 30s and 60s.
initializeOffsetsDistribution = view.Distribution(10, 100, 1000, 10000, 30000, 60000)
)

func init() {
views := []*view.View{
{
Description: "Latency of consumer group schedule operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
Measure: scheduleLatencyStat,
Aggregation: scheduleDistribution,
},
{
Description: "Latency of consumer group offsets initialization operations",
TagKeys: []tag.Key{controller.NamespaceTagKey},
Measure: initializeOffsetsLatencyStat,
Aggregation: initializeOffsetsDistribution,
},
}
if err := view.Register(views...); err != nil {
panic(err)

Check warning on line 97 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L97

Added line #L97 was not covered by tests
}
}

type Scheduler struct {
scheduler.Scheduler
SchedulerConfig
Expand Down Expand Up @@ -103,7 +139,7 @@ type Reconciler struct {
DeleteConsumerGroupMetadataCounter *counter.Counter
}

func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
if err := r.reconcileInitialOffset(ctx, cg); err != nil {
return cg.MarkInitializeOffsetFailed("InitializeOffset", err)
}
Expand Down Expand Up @@ -150,7 +186,7 @@ func (r Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.Consum
return nil
}

func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {
func (r *Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.ConsumerGroup) reconciler.Event {

cg.Spec.Replicas = pointer.Int32(0)
err := r.schedule(ctx, cg) //de-schedule placements
Expand All @@ -159,7 +195,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume
cg.Status.Placements = nil

// return an error to 1. update the status. 2. not clear the finalizer
return errors.New("placement list was not empty")
return fmt.Errorf("failed to unschedule consumer group: %w", err)

Check warning on line 198 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L198

Added line #L198 was not covered by tests
}

// Get consumers associated with the ConsumerGroup.
Expand All @@ -185,7 +221,7 @@ func (r Reconciler) FinalizeKind(ctx context.Context, cg *kafkainternals.Consume
return nil
}

func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
saramaSecurityOption, err := r.newAuthConfigOption(ctx, cg)
if err != nil {
return fmt.Errorf("failed to create config options for Kafka cluster auth: %w", err)
Expand Down Expand Up @@ -213,7 +249,7 @@ func (r Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkain
return nil
}

func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {

// Get consumers associated with the ConsumerGroup.
existingConsumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector))
Expand Down Expand Up @@ -242,7 +278,7 @@ func (r Reconciler) reconcileConsumers(ctx context.Context, cg *kafkainternals.C
return nil
}

func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error {
func (r *Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafkainternals.ConsumerGroup, pc ConsumersPerPlacement) error {

placement := *pc.Placement
consumers := pc.Consumers
Expand Down Expand Up @@ -297,7 +333,7 @@ func (r Reconciler) reconcileConsumersInPlacement(ctx context.Context, cg *kafka
return nil
}

func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
func (r *Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.ConsumerGroup, placement eventingduckv1alpha1.Placement) error {
c := cg.ConsumerFromTemplate()

c.Name = r.NameGenerator.GenerateName(cg.GetName() + "-")
Expand All @@ -310,7 +346,7 @@ func (r Reconciler) createConsumer(ctx context.Context, cg *kafkainternals.Consu
return nil
}

func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
func (r *Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainternals.Consumer) error {
dOpts := metav1.DeleteOptions{
Preconditions: &metav1.Preconditions{UID: &consumer.UID},
}
Expand All @@ -321,7 +357,10 @@ func (r Reconciler) finalizeConsumer(ctx context.Context, consumer *kafkainterna
return nil
}

func (r Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
startTime := time.Now()
defer recordScheduleLatency(ctx, cg, startTime)

statefulSetScheduler := r.SchedulerFunc(cg.GetUserFacingResourceRef().Kind)

// Ensure Contract configmaps are created before scheduling to avoid having pending pods due to missing
Expand All @@ -348,7 +387,7 @@ type ConsumersPerPlacement struct {
Consumers []*kafkainternals.Consumer
}

func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
func (r *Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.Placement, consumers []*kafkainternals.Consumer) []ConsumersPerPlacement {
placementConsumers := make([]ConsumersPerPlacement, 0, int(math.Max(float64(len(placements)), float64(len(consumers)))))

// Group consumers by Pod bind.
Expand Down Expand Up @@ -403,7 +442,7 @@ func (r Reconciler) joinConsumersByPlacement(placements []eventingduckv1alpha1.P
return placementConsumers
}

func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) {
func (r *Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Condition, error) {
consumers, err := r.ConsumerLister.Consumers(cg.GetNamespace()).List(labels.SelectorFromSet(cg.Spec.Selector))
if err != nil {
return nil, fmt.Errorf("failed to list consumers for selector %+v: %w", cg.Spec.Selector, err)
Expand Down Expand Up @@ -435,7 +474,10 @@ func (r Reconciler) propagateStatus(cg *kafkainternals.ConsumerGroup) (*apis.Con
return condition, nil
}

func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
startTime := time.Now()
defer recordInitializeOffsetsLatency(ctx, cg, startTime)

if cg.Spec.Template.Spec.Delivery == nil || cg.Spec.Template.Spec.Delivery.InitialOffset == sources.OffsetEarliest {
return nil
}
Expand Down Expand Up @@ -479,7 +521,7 @@ func (r Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkainterna
return nil
}

func (r Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {
func (r *Reconciler) reconcileKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {

Check warning on line 524 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L524

Added line #L524 was not covered by tests
var triggerAuthentication *kedav1alpha1.TriggerAuthentication
var secret *corev1.Secret

Expand Down Expand Up @@ -620,7 +662,7 @@ func (r *Reconciler) reconcileSecret(ctx context.Context, expectedSecret *corev1
return nil
}

func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
func (r *Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler Scheduler) error {
selector := labels.SelectorFromSet(map[string]string{"app": scheduler.StatefulSetName})
pods, err := r.PodLister.
Pods(r.SystemNamespace).
Expand All @@ -644,7 +686,7 @@ func (r Reconciler) ensureContractConfigmapsExist(ctx context.Context, scheduler
return nil
}

func (r Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error {
func (r *Reconciler) ensureContractConfigMapExists(ctx context.Context, p *corev1.Pod, name string) error {
// Check if ConfigMap exists in lister cache
_, err := r.ConfigMapLister.ConfigMaps(r.SystemNamespace).Get(name)
// ConfigMap already exists, return
Expand Down Expand Up @@ -684,3 +726,31 @@ var (
_ consumergroup.Interface = &Reconciler{}
_ consumergroup.Finalizer = &Reconciler{}
)

func recordScheduleLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

Check warning on line 738 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L737-L738

Added lines #L737 - L738 were not covered by tests

metrics.Record(ctx, scheduleLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}

func recordInitializeOffsetsLatency(ctx context.Context, cg *kafkainternals.ConsumerGroup, startTime time.Time) {
func() {
ctx, err := tag.New(
ctx,
tag.Insert(controller.NamespaceTagKey, cg.Namespace),
)
if err != nil {
return
}

Check warning on line 752 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L751-L752

Added lines #L751 - L752 were not covered by tests

metrics.Record(ctx, initializeOffsetsLatencyStat.M(time.Since(startTime).Milliseconds()))
}()
}
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,7 @@ func TestReconcileKind(t *testing.T) {
_, exampleConfig := cm.ConfigMapsFromTestFile(t, configapis.FlagsConfigName)
store.OnConfigChanged(exampleConfig)

r := Reconciler{
r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Expand Down Expand Up @@ -1787,7 +1787,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {

ctx, _ = kedaclient.With(ctx)

r := Reconciler{
r := &Reconciler{
SchedulerFunc: func(s string) Scheduler {
ss := row.OtherTestData[testSchedulerKey].(scheduler.Scheduler)
return Scheduler{
Expand Down
9 changes: 6 additions & 3 deletions control-plane/pkg/reconciler/consumergroup/evictor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type evictor struct {
// newEvictor creates a new evictor.
//
// fields are additional logger fields to be attached to the evictor logger.
func newEvictor(ctx context.Context, fields ...zap.Field) evictor {
return evictor{
func newEvictor(ctx context.Context, fields ...zap.Field) *evictor {
return &evictor{
ctx: ctx,
kubeClient: kubeclient.Get(ctx),
InternalsClient: kafkainternalsclient.Get(ctx).InternalV1alpha1(),
Expand All @@ -60,7 +60,7 @@ func newEvictor(ctx context.Context, fields ...zap.Field) evictor {
}
}

func (e evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error {
func (e *evictor) evict(pod *corev1.Pod, vpod scheduler.VPod, from *eventingduckv1alpha1.Placement) error {
key := vpod.GetKey()

logger := e.logger.
Expand Down Expand Up @@ -124,6 +124,9 @@ func (e *evictor) disablePodScheduling(logger *zap.Logger, pod *corev1.Pod) erro
_, err := e.kubeClient.CoreV1().
Pods(pod.GetNamespace()).
Update(e.ctx, pod, metav1.UpdateOptions{})
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("failed to update pod %s/%s: %w", pod.GetNamespace(), pod.GetName(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/consumergroup/evictor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestEvictorEvictPodNotFound(t *testing.T) {
e := newEvictor(ctx)
err := e.evict(pod, cg, placement)

require.NotNil(t, err)
require.Nil(t, err)
}
func TestEvictorEvictConsumerGroupNotFound(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/google/gofuzz v1.2.0
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/kedacore/keda/v2 v2.8.1
go.opencensus.io v0.24.0
knative.dev/eventing v0.38.1-0.20230815095940-29ac3eee64a4
knative.dev/hack v0.0.0-20230815012940-044c02b7a447
knative.dev/pkg v0.0.0-20230815132840-4f651e092853
Expand Down Expand Up @@ -133,7 +134,6 @@ require (
github.com/wavesoftware/go-ensure v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/mod v0.12.0 // indirect
Expand Down

0 comments on commit da76e75

Please sign in to comment.